diff --git a/package.yaml b/package.yaml index 19732212b..61be0c32e 100644 --- a/package.yaml +++ b/package.yaml @@ -135,6 +135,7 @@ dependencies: - cassava-conduit - constraints - memory + - pqueue other-extensions: - GeneralizedNewtypeDeriving diff --git a/src/Database/Persist/Class/Instances.hs b/src/Database/Persist/Class/Instances.hs index 23209a44b..2dbb2bfb0 100644 --- a/src/Database/Persist/Class/Instances.hs +++ b/src/Database/Persist/Class/Instances.hs @@ -21,3 +21,6 @@ instance PersistEntity record => Binary (Key record) where put = Binary.put . toPersistValue putList = Binary.putList . map toPersistValue get = either (fail . unpack) return . fromPersistValue =<< Binary.get + +instance PersistEntity record => NFData (Key record) where + rnf = rnf . keyToValues diff --git a/src/Database/Persist/Types/Instances.hs b/src/Database/Persist/Types/Instances.hs index eb02f5a22..0929a2886 100644 --- a/src/Database/Persist/Types/Instances.hs +++ b/src/Database/Persist/Types/Instances.hs @@ -20,3 +20,4 @@ deriving instance Typeable PersistValue instance Hashable PersistValue instance Binary PersistValue +instance NFData PersistValue diff --git a/src/Jobs.hs b/src/Jobs.hs index 7d9a1ea3d..e5fee67d6 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -145,7 +145,7 @@ manageJobPool foundation@UniWorX{..} endo <- execWriterT . replicateM_ missing $ do workerId <- newWorkerId let logIdent = mkLogIdent workerId - (bChan, chan) <- atomically $ newBroadcastTChan >>= (\c -> (c, ) <$> dupTChan c) + chan <- liftIO $ newTVarIO mempty let streamChan = join . atomically $ do shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown @@ -153,7 +153,10 @@ manageJobPool foundation@UniWorX{..} | shouldTerminate -> return $ return () | otherwise -> do - nextVal <- readTChan chan + queue <- readTVar chan + nextVal <- case jqDequeue queue of + Nothing -> retry + Just (j, q) -> j <$ writeTVar chan q return $ yield nextVal >> streamChan runWorker = unsafeHandler foundation . flip runReaderT (jobContext oldState) $ do $logInfoS logIdent "Started" @@ -162,7 +165,7 @@ manageJobPool foundation@UniWorX{..} worker <- allocateLinkedAsync runWorker tell . Endo $ \cSt -> cSt - { jobWorkers = Map.insert worker bChan $ jobWorkers cSt + { jobWorkers = Map.insert worker chan $ jobWorkers cSt , jobWorkerName = \a -> bool (jobWorkerName cSt a) workerId $ a == worker } atomically . putTMVar appJobState $ endo `appEndo` oldState @@ -181,12 +184,14 @@ manageJobPool foundation@UniWorX{..} void . lift . allocateLinkedAsync $ let go = do next <- evalRandTIO . mapRandT (liftIO . atomically) . runMaybeT $ do - nextVal <- MaybeT . lift . tryReadTChan $ jobWorkers oldState ! jobAsync + let chan = jobWorkers oldState ! jobAsync + (nextVal, newQueue) <- MaybeT . lift . fmap jqDequeue $ readTVar chan + lift . lift $ writeTVar chan newQueue jobWorkers' <- lift . lift $ jobWorkers <$> readTMVar appJobState receiver <- maybe (lift $ lift retry) return =<< uniformMay jobWorkers' return (nextVal, receiver) whenIsJust next $ \(nextVal, receiver) -> do - atomically $ writeTChan receiver nextVal + atomically . modifyTVar' receiver $ jqInsert nextVal go in go @@ -216,15 +221,15 @@ execCrontab :: RWST JobContext () (HashMap JobCtl (Max UTCTime)) (HandlerT UniWo execCrontab = do mapRWST (liftHandlerT . runDB . setSerializable) $ do let - mergeLastExec (Entity leId CronLastExec{..}) + mergeLastExec (Entity _leId CronLastExec{..}) | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max cronLastExecTime) - | otherwise = lift $ delete leId + | otherwise = return () - mergeQueued (Entity qjId QueuedJob{..}) + mergeQueued (Entity _qjId QueuedJob{..}) | Just job <- Aeson.parseMaybe parseJSON queuedJobContent = State.modify $ HashMap.insertWith (<>) (JobCtlQueue job) (Max queuedJobCreationTime) - | otherwise = lift $ delete qjId + | otherwise = return () runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeLastExec runConduit $ transPipe lift (selectSource [] []) .| C.mapM_ mergeQueued @@ -341,7 +346,7 @@ handleJobs' wNum = C.mapM_ $ \jctl -> do handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) - handleCmd JobCtlNoOp = return () + handleCmd JobCtlTest = return () handleCmd JobCtlFlush = void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (lift . writeJobCtl . JobCtlPerform) handleCmd (JobCtlQueue job) = lift $ queueJob' job handleCmd (JobCtlPerform jId) = lift . handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do @@ -439,10 +444,10 @@ pruneLastExecs crontab = runConduit $ selectSource [] [] .| C.mapM_ ensureCronta | abs (now `diffUTCTime` cronLastExecTime) > flushInterval * 2 -> return () | Just job <- Aeson.parseMaybe parseJSON cronLastExecJob - , HashMap.member (JobCtlQueue job) crontab - -> return () - | otherwise + , not $ HashMap.member (JobCtlQueue job) crontab -> lift $ delete leId + | otherwise + -> return () determineCrontab' :: DB (Crontab JobCtl) determineCrontab' = (\ct -> ct <$ pruneLastExecs ct) =<< determineCrontab diff --git a/src/Jobs/HealthReport.hs b/src/Jobs/HealthReport.hs index 90c838963..157934cf1 100644 --- a/src/Jobs/HealthReport.hs +++ b/src/Jobs/HealthReport.hs @@ -159,7 +159,7 @@ dispatchHealthCheckActiveJobExecutors = HealthActiveJobExecutors <$> do in fromInteger micro $logDebugS "HealthCheckActiveJobExecutors" . tshow . map showWorkerId $ Map.elems workers' responders <- fmap (getSum . fold) . liftIO . forConcurrently (Map.toList workers) $ \(_, wName) - -> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlNoOp) + -> fromMaybe (Sum 0) <$> timeout timeoutMicro (runReaderT ?? app $ Sum 1 <$ writeJobCtlBlock' (writeJobCtl' wName) JobCtlTest) if | Map.null workers -> return Nothing | otherwise -> return . Just $ responders % fromIntegral (Map.size workers) diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index 7463dea78..1882f0fad 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -44,7 +44,7 @@ writeJobCtl' target cmd = do | null jobWorkers -> throwM JobQueuePoolEmpty | [(_, chan)] <- filter ((== target) . jobWorkerName . view _1) $ Map.toList jobWorkers - -> atomically $ writeTChan chan cmd + -> atomically . modifyTVar' chan $ jqInsert cmd | otherwise -> throwM JobQueueWorkerNotFound diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index 7aa1a3237..871f8d38e 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} + module Jobs.Types ( Job(..), Notification(..) , JobCtl(..) @@ -6,6 +8,8 @@ module Jobs.Types , jobWorkerNames , JobWorkerId , showWorkerId, newWorkerId + , JobQueue, jqInsert, jqDequeue + , JobPriority(..), prioritiseJob ) where import Import.NoFoundation hiding (Unique) @@ -20,6 +24,9 @@ import Data.Unique import qualified Data.Map.Strict as Map import qualified Data.Set as Set +import Data.PQueue.Prio.Max (MaxPQueue) +import qualified Data.PQueue.Prio.Max as PQ + data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } | JobSendTestEmail { jEmail :: Email, jMailContext :: MailContext } @@ -75,7 +82,9 @@ data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId } deriving (Eq, Ord, Show, Read, Generic, Typeable) instance Hashable Job +instance NFData Job instance Hashable Notification +instance NFData Notification deriveJSON defaultOptions { constructorTagModifier = camelToPathPiece' 1 @@ -97,10 +106,11 @@ data JobCtl = JobCtlFlush | JobCtlDetermineCrontab | JobCtlQueue Job | JobCtlGenerateHealthReport HealthCheck - | JobCtlNoOp + | JobCtlTest deriving (Eq, Ord, Read, Show, Generic, Typeable) instance Hashable JobCtl +instance NFData JobCtl newtype JobWorkerId = JobWorkerId { jobWorkerUnique :: Unique } @@ -118,8 +128,35 @@ data JobContext = JobContext , jobConfirm :: TVar (HashMap JobCtl (NonEmpty (TMVar (Maybe SomeException)))) } + +data JobPriority = JobPrioBatch | JobPrioRealtime + deriving (Eq, Ord, Enum, Bounded, Read, Show, Generic, Typeable) +instance Universe JobPriority +instance Finite JobPriority +instance NFData JobPriority + +prioritiseJob :: JobCtl -> JobPriority +prioritiseJob JobCtlTest = JobPrioRealtime +prioritiseJob (JobCtlGenerateHealthReport _) = JobPrioRealtime +prioritiseJob JobCtlDetermineCrontab = JobPrioRealtime +prioritiseJob _ = JobPrioBatch + + +newtype JobQueue = JobQueue { getJobQueue :: MaxPQueue JobPriority JobCtl } + deriving (Eq, Ord, Read, Show) + deriving newtype (Monoid, NFData) + +makePrisms ''JobQueue + +jqInsert :: JobCtl -> JobQueue -> JobQueue +jqInsert job = force . over _JobQueue $ PQ.insertBehind (prioritiseJob job) job + +jqDequeue :: JobQueue -> Maybe (JobCtl, JobQueue) +jqDequeue = fmap ((\r@(_, q) -> q `deepseq` r) . over _2 JobQueue) . PQ.maxView . getJobQueue + + data JobState = JobState - { jobWorkers :: Map (Async ()) (TChan JobCtl) + { jobWorkers :: Map (Async ()) (TVar JobQueue) , jobWorkerName :: Async () -> JobWorkerId , jobContext :: JobContext , jobPoolManager :: Async () diff --git a/src/Mail.hs b/src/Mail.hs index 8cfa03200..df285bfa0 100644 --- a/src/Mail.hs +++ b/src/Mail.hs @@ -160,6 +160,7 @@ instance Default MailLanguages where def = MailLanguages [] instance Hashable MailLanguages +instance NFData MailLanguages data MailContext = MailContext @@ -172,10 +173,12 @@ deriveJSON defaultOptions } ''MailContext instance Hashable MailContext +instance NFData MailContext instance Default MailContext where - def = MailContext { mcLanguages = def - , mcDateTimeFormat = def - } + def = MailContext + { mcLanguages = def + , mcDateTimeFormat = def + } makeLenses_ ''MailContext diff --git a/src/Model/Types/Health.hs b/src/Model/Types/Health.hs index ce0f53e23..89de617ab 100644 --- a/src/Model/Types/Health.hs +++ b/src/Model/Types/Health.hs @@ -20,6 +20,7 @@ data HealthCheck instance Universe HealthCheck instance Finite HealthCheck instance Hashable HealthCheck +instance NFData HealthCheck deriveJSON defaultOptions { constructorTagModifier = camelToPathPiece' 2 diff --git a/src/Model/Types/School.hs b/src/Model/Types/School.hs index 198273735..dcbb5ecc6 100644 --- a/src/Model/Types/School.hs +++ b/src/Model/Types/School.hs @@ -12,6 +12,7 @@ data SchoolFunction instance Universe SchoolFunction instance Finite SchoolFunction instance Hashable SchoolFunction +instance NFData SchoolFunction nullaryPathPiece ''SchoolFunction $ camelToPathPiece' 1 pathPieceJSON ''SchoolFunction diff --git a/src/Model/Types/Security.hs b/src/Model/Types/Security.hs index fe1739fd0..cf9681e0e 100644 --- a/src/Model/Types/Security.hs +++ b/src/Model/Types/Security.hs @@ -27,6 +27,7 @@ data AuthenticationMode = AuthLDAP deriving (Eq, Ord, Read, Show, Generic) instance Hashable AuthenticationMode +instance NFData AuthenticationMode deriveJSON defaultOptions { constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel diff --git a/src/Network/Mail/Mime/Instances.hs b/src/Network/Mail/Mime/Instances.hs index 18c22e67a..7861f5c3d 100644 --- a/src/Network/Mail/Mime/Instances.hs +++ b/src/Network/Mail/Mime/Instances.hs @@ -21,9 +21,10 @@ deriving instance Ord Address deriving instance Generic Address instance Hashable Address +instance NFData Address deriveToJSON defaultOptions - { fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel + { fieldLabelModifier = camelToPathPiece' 1 } ''Address instance FromJSON Address where diff --git a/src/Settings.hs b/src/Settings.hs index 5c9d0ba61..0874d2b50 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -155,8 +155,11 @@ data LogDestination = LogDestStderr | LogDestStdout | LogDestFile { logDestFile deriving instance Generic LogLevel instance Hashable LogLevel +instance NFData LogLevel instance Hashable LogSettings +instance NFData LogSettings instance Hashable LogDestination +instance NFData LogDestination data UserDefaultConf = UserDefaultConf { userDefaultTheme :: Theme diff --git a/src/Text/Blaze/Instances.hs b/src/Text/Blaze/Instances.hs index 6bc967a9b..3ff06308b 100644 --- a/src/Text/Blaze/Instances.hs +++ b/src/Text/Blaze/Instances.hs @@ -43,3 +43,6 @@ instance Csv.ToField Markup where instance Csv.FromField Markup where parseField = fmap preEscapedText . Csv.parseField + +instance NFData Markup where + rnf = rnf . Text.renderMarkup