-- SPDX-FileCopyrightText: 2022-23 Steffen Jost -- -- SPDX-License-Identifier: AGPL-3.0-or-later module Jobs.Handler.SynchroniseAvs ( dispatchJobSynchroniseAvs -- , dispatchJobSynchroniseAvsId -- , dispatchJobSynchroniseAvsUser , dispatchJobSynchroniseAvsQueue ) where import Import import qualified Data.Set as Set import qualified Data.Conduit.List as C import Database.Esqueleto.Experimental ((:&)(..)) import qualified Database.Esqueleto.Experimental as E -- needs TypeApplications Lang-Pragma -- import qualified Database.Esqueleto.Legacy as E hiding (upsert) -- import qualified Database.Esqueleto.PostgreSQL as E import qualified Database.Esqueleto.Utils as E import Jobs.Queue import Handler.Utils.Avs -- pause is a date in the past; don't synch again if the last synch was after pause dispatchJobSynchroniseAvs :: Natural -> Natural -> Natural -> Maybe Day -> JobHandler UniWorX dispatchJobSynchroniseAvs numIterations epoch iteration pause = JobHandlerException . runDB $ do now <- liftIO getCurrentTime todos <- runConduit $ readUsers .| filterIteration now .| sinkList putMany todos $logInfoS "SynchronisAvs" [st|AVS synch summary for #{tshow numIterations}/#{tshow epoch}/#{tshow iteration}: #{length todos}|] void $ queueJob JobSynchroniseAvsQueue where readUsers :: ConduitT () UserId _ () readUsers = selectKeys [] [] filterIteration :: UTCTime -> ConduitT UserId AvsSync _ () filterIteration now = C.mapMaybeM $ \userId -> runMaybeT $ do let userIteration, currentIteration :: Integer userIteration = toInteger (hash epoch `hashWithSalt` userId) `mod` toInteger numIterations currentIteration = toInteger iteration `mod` toInteger numIterations $logDebugS "SynchronisAvs" [st|User ##{tshow (fromSqlKey userId)}: AVS sync on #{tshow userIteration}/#{tshow numIterations}, now #{tshow currentIteration}|] guard $ userIteration == currentIteration return $ AvsSync userId now pause -- dispatchJobSynchroniseAvsId :: AvsPersonId -> Maybe Day -> JobHandler UniWorX -- dispatchJobSynchroniseAvsId apid pause = JobHandlerException $ -- maybeM insertUnknown processKnown $ runDB $ getBy (UniqueUserAvsId apid) -- where -- processKnown Entity{entityVal=UserAvs{userAvsUser=uid}} = workJobSychronizeAvs uid pause -- insertUnknown = void $ maybeCatchAll $ Just <$> upsertAvsUserById apid -- dispatchJobSynchroniseAvsUser :: UserId -> Maybe Day -> JobHandler UniWorX -- dispatchJobSynchroniseAvsUser uid pause = JobHandlerException $ workJobSychronizeAvs uid pause -- workJobSychronizeAvs :: UserId -> Maybe Day -> Handler () -- workJobSychronizeAvs uid pause = do -- now <- liftIO getCurrentTime -- -- void $ E.upsert -- -- AvsSync { avsSyncUser = uid -- -- , avsSyncCreationTime = now -- -- , avsSyncPause = pause -- -- } -- -- [ \oldSync -> (AvsSyncPause E.=. E.greatest (E.val pause) (oldSync E.^. AvsSyncPause)) oldSync ] -- causes Esqueleto to call undefined at Database.Esqueleto.Internal.Internal.renderUpdates:1308 -- runDB $ maybeM -- (insert_ AvsSync{avsSyncUser=uid, avsSyncCreationTime=now, avsSyncPause=pause}) -- (\Entity{entityKey=asid, entityVal=AvsSync{avsSyncPause=oldPause}} -> -- update asid [AvsSyncPause =. max pause oldPause, AvsSyncCreationTime =. now]) -- (getBy $ UniqueAvsSyncUser uid) -- void $ queueJob JobSynchroniseAvsQueue -- dispatchJobSynchroniseAvsQueue :: JobHandler UniWorX -- dispatchJobSynchroniseAvsQueue = JobHandlerException $ do -- (unlinked,linked) <- runDB $ do -- jobs <- E.select (do -- (avsSync :& usrAvs) <- E.from $ E.table @AvsSync -- `E.leftJoin` E.table @UserAvs -- `E.on` (\(avsSync :& usrAvs) -> avsSync E.^. AvsSyncUser E.=?. usrAvs E.?. UserAvsUser) -- let pause = avsSync E.^. AvsSyncPause -- lastSync = usrAvs E.?. UserAvsLastSynch -- E.where_ $ E.isNothing pause -- E.||. E.isNothing lastSync -- E.||. pause E.>. E.dayMaybe lastSync -- return (avsSync E.^. AvsSyncId, avsSync E.^. AvsSyncUser, usrAvs E.?. UserAvsPersonId) -- ) -- let (syncIds, unlinked, linked) = foldl' discernJob mempty jobs -- E.deleteWhere [AvsSyncId <-. syncIds] -- return (unlinked, linked) -- void $ updateAvsUserByIds linked -- void $ linktoAvsUserByUIDs unlinked -- -- we do not reschedule failed synchs here in order to avoid a loop -- where -- discernJob (accSync, accUid, accApi) (E.Value k, _, E.Value (Just api)) = (k:accSync, accUid, Set.insert api accApi) -- discernJob (accSync, accUid, accApi) (E.Value k, E.Value uid, E.Value Nothing ) = (k:accSync, Set.insert uid accUid, accApi) dispatchJobSynchroniseAvsQueue :: JobHandler UniWorX dispatchJobSynchroniseAvsQueue = JobHandlerException $ do jobs <- runDB $ do jobs <- E.select (do (avsSync :& usrAvs) <- E.from $ E.table @AvsSync `E.leftJoin` E.table @UserAvs `E.on` (\(avsSync :& usrAvs) -> avsSync E.^. AvsSyncUser E.=?. usrAvs E.?. UserAvsUser) let pause = avsSync E.^. AvsSyncPause lastSync = usrAvs E.?. UserAvsLastSynch E.where_ $ E.isNothing pause E.||. E.isNothing lastSync E.||. pause E.>. E.dayMaybe lastSync return (avsSync E.^. AvsSyncUser, usrAvs E.?. UserAvsPersonId) ) now <- liftIO getCurrentTime E.truncateTable $ AvsSync (error "truncateTable: AvsSyncUser not needed") now Nothing return jobs let (unlinked, linked) = foldl' discernJob mempty jobs $logInfoS "SynchronisAvs" [st|AVS synch performing for #{length linked} AVS linked users and #{length unlinked} unlinked users|] void $ updateAvsUserByIds linked void $ linktoAvsUserByUIDs unlinked $logInfoS "SynchronisAvs" [st|AVS synch performed for #{length linked} AVS linked users and #{length unlinked} unlinked users|] -- we do not reschedule failed synchs here in order to avoid a loop where discernJob (accUid, accApi) ( _ , E.Value (Just api)) = ( accUid, Set.insert api accApi) discernJob (accUid, accApi) (E.Value uid, E.Value Nothing ) = (Set.insert uid accUid, accApi)