diff --git a/config/settings.yml b/config/settings.yml index cd68a3261..db4316c9c 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -16,6 +16,8 @@ mail-verp: separator: "+" at-replacement: "=" +job-workers: "_env:JOB_WORKERS:16" + detailed-logging: "_env:DETAILED_LOGGING:false" should-log-all: "_env:LOG_ALL:false" minimum-log-level: "_env:LOGLEVEL:warn" @@ -61,8 +63,8 @@ smtp: pass: "_env:SMTPPASS:" pool: stripes: "_env:SMTPSTRIPES:1" - timeout: "_env:SMTPTIMEOUT:20" - limit: "_env:SMTPLIMIT:1" + timeout: "_env:SMTPTIMEOUT:300" + limit: "_env:SMTPLIMIT:8" user-defaults: max-favourites: 12 diff --git a/db.hs b/db.hs index ca4c312a6..71128a7ff 100755 --- a/db.hs +++ b/db.hs @@ -6,6 +6,7 @@ {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeFamilies #-} import "uniworx" Import hiding (Option(..)) import "uniworx" Application (db, getAppDevSettings) diff --git a/package.yaml b/package.yaml index d620d3fc7..9c28d567d 100644 --- a/package.yaml +++ b/package.yaml @@ -100,6 +100,7 @@ dependencies: - network - resource-pool - mime-mail +- hashable # The library contains all of our application code. The executable # defined below is just a thin wrapper. diff --git a/src/Application.hs b/src/Application.hs index e9a9dab00..bf77cecc9 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -96,7 +96,7 @@ makeFoundation appSettings@(AppSettings{..}) = do appCryptoIDKey <- readKeyFile appCryptoIDKeyFile appInstanceID <- maybe UUID.nextRandom readInstanceIDFile appInstanceIDFile - (appJobCtl, recvChan) <- atomically $ do + (appJobCtl, recvChans) <- fmap unzip . atomically . replicateM appJobWorkers $ do chan <- newBroadcastTMChan recvChan <- dupTMChan chan return (chan, recvChan) @@ -127,7 +127,7 @@ makeFoundation appSettings@(AppSettings{..}) = do -- Perform database migration using our application's logging settings. migrateAll `runSqlPool` sqlPool - liftIO . void . fork . handleJobs $ (mkFoundation sqlPool smtpPool) { appJobCtl = recvChan } + handleJobs recvChans $ mkFoundation sqlPool smtpPool -- Return the foundation return $ mkFoundation sqlPool smtpPool @@ -268,7 +268,7 @@ getApplicationRepl = do shutdownApp :: UniWorX -> IO () shutdownApp UniWorX{..} = do - atomically $ closeTMChan appJobCtl + atomically $ mapM_ closeTMChan appJobCtl --------------------------------------------- @@ -286,7 +286,7 @@ db = handler . runDB addPWEntry :: User -> Text {-^ Password -} -> IO () -addPWEntry User{..} (Text.encodeUtf8 -> pw) = db $ do +addPWEntry User{ userAuthentication = _, ..} (Text.encodeUtf8 -> pw) = db $ do PWHashConf{..} <- getsYesod $ appAuthPWHash . appSettings (AuthPWHash . Text.decodeUtf8 -> userAuthentication) <- liftIO $ makePasswordWith pwHashAlgorithm pw pwHashStrength void $ insert User{..} diff --git a/src/Foundation.hs b/src/Foundation.hs index 4a9cbec87..b47168790 100644 --- a/src/Foundation.hs +++ b/src/Foundation.hs @@ -118,7 +118,7 @@ data UniWorX = UniWorX , appLogger :: Logger , appCryptoIDKey :: CryptoIDKey , appInstanceID :: InstanceId - , appJobCtl :: TMChan JobCtl + , appJobCtl :: [TMChan JobCtl] } type SMTPPool = Pool SMTPConnection diff --git a/src/Import/NoFoundation.hs b/src/Import/NoFoundation.hs index 531535806..fc3bdcda9 100644 --- a/src/Import/NoFoundation.hs +++ b/src/Import/NoFoundation.hs @@ -33,3 +33,5 @@ import Mail as Import import Data.Data as Import (Data) import Data.Typeable as Import (Typeable) import GHC.Generics as Import (Generic) + +import Data.Hashable as Import diff --git a/src/Jobs.hs b/src/Jobs.hs index 8ecbfde31..d8ddd088e 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -35,6 +35,8 @@ import Control.Monad.Trans.Writer (WriterT(..), execWriterT) import Utils.Lens +import Control.Monad.Random (evalRand, uniform, mkStdGen) + data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime @@ -44,66 +46,70 @@ data JobQueueException = JInvalid QueuedJobId QueuedJob instance Exception JobQueueException -handleJobs :: UniWorX -> IO () +handleJobs :: MonadIO m => [TMChan JobCtl] -> UniWorX -> m () -- | Read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... -handleJobs foundation@UniWorX{..} = unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan appJobCtl .| handleJobs' - where - logStart = $logDebugS "Jobs" "Started" - logStop = $logDebugS "Jobs" "Shutting down" +handleJobs recvChans foundation@UniWorX{..} = liftIO . forM_ (zip [1..] recvChans) $ \(n, chan) -> let + logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" + logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" + in void . fork . unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan chan .| handleJobs' n + -handleJobs' :: Sink JobCtl Handler () -handleJobs' = C.mapM_ $ void . handleAny ($logErrorS "Jobs" . tshow) . handleCmd +handleJobs' :: Int -> Sink JobCtl Handler () +handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . handleCmd where + logIdent = "Jobs #" <> tshow wNum + handleQueueException :: MonadLogger m => JobQueueException -> m () - handleQueueException (JInvalid jId j) = $logWarnS "Jobs" $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j - handleQueueException (JNonexistant jId) = $logInfoS "Jobs" $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) - handleQueueException (JLocked jId lInstance lTime) = $logDebugS "Jobs" $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) + handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j + handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) + handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) handleCmd JobCtlFlush = void . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) - handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \QueuedJob{..} -> do - let - content :: Job - Aeson.Success content = fromJSON queuedJobContent -- `jLocked` ensures that `queuedJobContent` parses + handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do + content <- case fromJSON queuedJobContent of + Aeson.Success c -> return c + Aeson.Error t -> do + $logErrorS logIdent $ "Aeson decoding error: " <> pack t + throwM $ JInvalid jId j - $logDebugS "Jobs" . LT.toStrict . decodeUtf8 $ Aeson.encode content + $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content Last jobDone <- execWriterT $ performJob content - when (fromMaybe False jobDone) $ + when (fromMaybe True jobDone) $ runDB $ delete jId jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do hasLock <- liftIO $ newTVarIO False - val <- runDB . setSerializable $ do - j@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId - maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime - case fromJSON queuedJobContent :: Aeson.Result Job of - Aeson.Success _ -> return () - Aeson.Error t -> do - $logErrorS "Jobs" $ "Aeson decoding error: " <> pack t - throwM $ JInvalid jId j - instanceID <- getsYesod appInstanceID - now <- liftIO getCurrentTime - val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID - , QueuedJobLockTime =. Just now - ] - liftIO . atomically $ writeTVar hasLock True - return val - act val `finally` whenM (liftIO . atomically $ readTVar hasLock) jUnlock - where - jUnlock :: Handler () - jUnlock = runDB . setSerializable $ - update jId [ QueuedJobLockInstance =. Nothing - , QueuedJobLockTime =. Nothing - ] + + let + lock = runDB . setSerializable $ do + QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId + maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime + instanceID <- getsYesod appInstanceID + now <- liftIO getCurrentTime + val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID + , QueuedJobLockTime =. Just now + ] + liftIO . atomically $ writeTVar hasLock True + return val + + unlock = whenM (liftIO . atomically $ readTVar hasLock) $ + runDB . setSerializable $ + update jId [ QueuedJobLockInstance =. Nothing + , QueuedJobLockTime =. Nothing + ] + + bracket lock (const unlock) act writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m () writeJobCtl cmd = do - chan <- getsYesod appJobCtl + tid <- liftIO myThreadId + chan <- flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) . uniform <$> getsYesod appJobCtl liftIO . atomically $ writeTMChan chan cmd queueJob :: Job -> YesodDB UniWorX QueuedJobId diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs index a27ad355a..15a3764e8 100644 --- a/src/Jobs/Types.hs +++ b/src/Jobs/Types.hs @@ -21,6 +21,9 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId } deriving (Eq, Ord, Show, Read, Generic, Typeable) +instance Hashable Job +instance Hashable Notification + deriveJSON defaultOptions { constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel , fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel @@ -39,3 +42,5 @@ deriveJSON defaultOptions data JobCtl = JobCtlFlush | JobCtlPerform QueuedJobId deriving (Eq, Ord, Read, Show, Generic, Typeable) + +instance Hashable JobCtl diff --git a/src/Mail.hs b/src/Mail.hs index 08a121600..dec3ff823 100644 --- a/src/Mail.hs +++ b/src/Mail.hs @@ -69,6 +69,8 @@ import qualified Data.Text as Text import qualified Data.Foldable as Foldable +import Data.Hashable + import qualified Data.Text.Lazy as LT import qualified Data.ByteString.Lazy as LBS @@ -127,12 +129,14 @@ instance Monoid (MailSmtpData) where mappend = mappenddefault newtype MailLanguages = MailLanguages { mailLanguages :: [Lang] } - deriving (Eq, Ord, Show, Read) + deriving (Eq, Ord, Show, Read, Generic, Typeable) deriving newtype (FromJSON, ToJSON) instance Default MailLanguages where def = MailLanguages [] +instance Hashable MailLanguages + class (MonadHandler m, MonadState Mail m) => MonadMail m where askMailLanguages :: m MailLanguages tellMailSmtpData :: MailSmtpData -> m () @@ -206,17 +210,19 @@ defMailT ls (MailT mail) = do (ret, mail, smtpData) <- runRWST mail ls (emptyMail fromAddress) mail' <- liftIO $ LBS.toStrict <$> renderMail' mail $logDebugS "Mail" $ "Rendered mail:\n" <> decodeUtf8 mail' - $logInfoS "Mail" $ "Submitting email: " <> tshow smtpData ret <$ case smtpData of MailSmtpData{ smtpEnvelopeFrom = Last Nothing } -> throwM MailNoSenderSpecified MailSmtpData{ smtpRecipients } | Set.null smtpRecipients -> throwM MailNoRecipientsSpecified MailSmtpData{ smtpEnvelopeFrom = Last (Just (unpack -> returnPath)) , smtpRecipients = (map unpack . toList -> recipients) - } -> mailSmtp $ liftIO . SMTP.sendMail - returnPath - recipients - mail' + } -> mailSmtp $ \conn -> do + $logInfoS "Mail" $ "Submitting email: " <> tshow smtpData + liftIO $ SMTP.sendMail + returnPath + recipients + mail' + conn data PrioritisedAlternatives m = PrioritisedAlternatives diff --git a/src/Model/Types.hs b/src/Model/Types.hs index 59ceba0f0..c96fc0989 100644 --- a/src/Model/Types.hs +++ b/src/Model/Types.hs @@ -5,7 +5,7 @@ {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE DeriveGeneric, DeriveDataTypeable, GeneralizedNewtypeDeriving #-} -{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses #-} +{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses, FlexibleContexts, UndecidableInstances #-} {-# LANGUAGE ViewPatterns #-} {-- # LANGUAGE ExistentialQuantification #-} -- for DA type {-# OPTIONS_GHC -fno-warn-orphans #-} -- for instance PathPiece (CI Text) @@ -453,6 +453,10 @@ deriveJSON defaultOptions derivePersistFieldJSON ''NotificationSettings +instance ToBackendKey SqlBackend record => Hashable (Key record) where + hashWithSalt s key = s `hashWithSalt` fromSqlKey key + + -- Type synonyms type Email = Text diff --git a/src/Settings.hs b/src/Settings.hs index 8ccdcf635..56efc87a6 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -82,6 +82,7 @@ data AppSettings = AppSettings , appMailFrom :: Address , appMailObjectDomain :: Text , appMailVerp :: VerpMode + , appJobWorkers :: Int , appDetailedRequestLogging :: Bool -- ^ Use detailed request logging system @@ -270,6 +271,8 @@ instance FromJSON AppSettings where appMailObjectDomain <- o .: "mail-object-domain" appMailVerp <- o .: "mail-verp" + appJobWorkers <- o .: "job-workers" + appDetailedRequestLogging <- o .:? "detailed-logging" .!= defaultDev appShouldLogAll <- o .:? "should-log-all" .!= defaultDev appMinimumLogLevel <- o .: "minimum-log-level"