Cleanup & worker pool

This commit is contained in:
Gregor Kleen 2018-10-10 12:18:22 +02:00
parent 4598b38242
commit 20db862f53
11 changed files with 83 additions and 53 deletions

View File

@ -16,6 +16,8 @@ mail-verp:
separator: "+"
at-replacement: "="
job-workers: "_env:JOB_WORKERS:16"
detailed-logging: "_env:DETAILED_LOGGING:false"
should-log-all: "_env:LOG_ALL:false"
minimum-log-level: "_env:LOGLEVEL:warn"
@ -61,8 +63,8 @@ smtp:
pass: "_env:SMTPPASS:"
pool:
stripes: "_env:SMTPSTRIPES:1"
timeout: "_env:SMTPTIMEOUT:20"
limit: "_env:SMTPLIMIT:1"
timeout: "_env:SMTPTIMEOUT:300"
limit: "_env:SMTPLIMIT:8"
user-defaults:
max-favourites: 12

1
db.hs
View File

@ -6,6 +6,7 @@
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}
import "uniworx" Import hiding (Option(..))
import "uniworx" Application (db, getAppDevSettings)

View File

@ -100,6 +100,7 @@ dependencies:
- network
- resource-pool
- mime-mail
- hashable
# The library contains all of our application code. The executable
# defined below is just a thin wrapper.

View File

@ -96,7 +96,7 @@ makeFoundation appSettings@(AppSettings{..}) = do
appCryptoIDKey <- readKeyFile appCryptoIDKeyFile
appInstanceID <- maybe UUID.nextRandom readInstanceIDFile appInstanceIDFile
(appJobCtl, recvChan) <- atomically $ do
(appJobCtl, recvChans) <- fmap unzip . atomically . replicateM appJobWorkers $ do
chan <- newBroadcastTMChan
recvChan <- dupTMChan chan
return (chan, recvChan)
@ -127,7 +127,7 @@ makeFoundation appSettings@(AppSettings{..}) = do
-- Perform database migration using our application's logging settings.
migrateAll `runSqlPool` sqlPool
liftIO . void . fork . handleJobs $ (mkFoundation sqlPool smtpPool) { appJobCtl = recvChan }
handleJobs recvChans $ mkFoundation sqlPool smtpPool
-- Return the foundation
return $ mkFoundation sqlPool smtpPool
@ -268,7 +268,7 @@ getApplicationRepl = do
shutdownApp :: UniWorX -> IO ()
shutdownApp UniWorX{..} = do
atomically $ closeTMChan appJobCtl
atomically $ mapM_ closeTMChan appJobCtl
---------------------------------------------
@ -286,7 +286,7 @@ db = handler . runDB
addPWEntry :: User
-> Text {-^ Password -}
-> IO ()
addPWEntry User{..} (Text.encodeUtf8 -> pw) = db $ do
addPWEntry User{ userAuthentication = _, ..} (Text.encodeUtf8 -> pw) = db $ do
PWHashConf{..} <- getsYesod $ appAuthPWHash . appSettings
(AuthPWHash . Text.decodeUtf8 -> userAuthentication) <- liftIO $ makePasswordWith pwHashAlgorithm pw pwHashStrength
void $ insert User{..}

View File

@ -118,7 +118,7 @@ data UniWorX = UniWorX
, appLogger :: Logger
, appCryptoIDKey :: CryptoIDKey
, appInstanceID :: InstanceId
, appJobCtl :: TMChan JobCtl
, appJobCtl :: [TMChan JobCtl]
}
type SMTPPool = Pool SMTPConnection

View File

@ -33,3 +33,5 @@ import Mail as Import
import Data.Data as Import (Data)
import Data.Typeable as Import (Typeable)
import GHC.Generics as Import (Generic)
import Data.Hashable as Import

View File

@ -35,6 +35,8 @@ import Control.Monad.Trans.Writer (WriterT(..), execWriterT)
import Utils.Lens
import Control.Monad.Random (evalRand, uniform, mkStdGen)
data JobQueueException = JInvalid QueuedJobId QueuedJob
| JLocked QueuedJobId InstanceId UTCTime
@ -44,66 +46,70 @@ data JobQueueException = JInvalid QueuedJobId QueuedJob
instance Exception JobQueueException
handleJobs :: UniWorX -> IO ()
handleJobs :: MonadIO m => [TMChan JobCtl] -> UniWorX -> m ()
-- | Read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
handleJobs foundation@UniWorX{..} = unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan appJobCtl .| handleJobs'
where
logStart = $logDebugS "Jobs" "Started"
logStop = $logDebugS "Jobs" "Shutting down"
handleJobs recvChans foundation@UniWorX{..} = liftIO . forM_ (zip [1..] recvChans) $ \(n, chan) -> let
logStart = $logDebugS ("Jobs #" <> tshow n) "Starting"
logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping"
in void . fork . unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan chan .| handleJobs' n
handleJobs' :: Sink JobCtl Handler ()
handleJobs' = C.mapM_ $ void . handleAny ($logErrorS "Jobs" . tshow) . handleCmd
handleJobs' :: Int -> Sink JobCtl Handler ()
handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . handleCmd
where
logIdent = "Jobs #" <> tshow wNum
handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS "Jobs" $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS "Jobs" $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS "Jobs" $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd JobCtlFlush = void . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \QueuedJob{..} -> do
let
content :: Job
Aeson.Success content = fromJSON queuedJobContent -- `jLocked` ensures that `queuedJobContent` parses
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do
content <- case fromJSON queuedJobContent of
Aeson.Success c -> return c
Aeson.Error t -> do
$logErrorS logIdent $ "Aeson decoding error: " <> pack t
throwM $ JInvalid jId j
$logDebugS "Jobs" . LT.toStrict . decodeUtf8 $ Aeson.encode content
$logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content
Last jobDone <- execWriterT $ performJob content
when (fromMaybe False jobDone) $
when (fromMaybe True jobDone) $
runDB $ delete jId
jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a
jLocked jId act = do
hasLock <- liftIO $ newTVarIO False
val <- runDB . setSerializable $ do
j@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId
maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime
case fromJSON queuedJobContent :: Aeson.Result Job of
Aeson.Success _ -> return ()
Aeson.Error t -> do
$logErrorS "Jobs" $ "Aeson decoding error: " <> pack t
throwM $ JInvalid jId j
instanceID <- getsYesod appInstanceID
now <- liftIO getCurrentTime
val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID
, QueuedJobLockTime =. Just now
]
liftIO . atomically $ writeTVar hasLock True
return val
act val `finally` whenM (liftIO . atomically $ readTVar hasLock) jUnlock
where
jUnlock :: Handler ()
jUnlock = runDB . setSerializable $
update jId [ QueuedJobLockInstance =. Nothing
, QueuedJobLockTime =. Nothing
]
let
lock = runDB . setSerializable $ do
QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId
maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime
instanceID <- getsYesod appInstanceID
now <- liftIO getCurrentTime
val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID
, QueuedJobLockTime =. Just now
]
liftIO . atomically $ writeTVar hasLock True
return val
unlock = whenM (liftIO . atomically $ readTVar hasLock) $
runDB . setSerializable $
update jId [ QueuedJobLockInstance =. Nothing
, QueuedJobLockTime =. Nothing
]
bracket lock (const unlock) act
writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
writeJobCtl cmd = do
chan <- getsYesod appJobCtl
tid <- liftIO myThreadId
chan <- flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) . uniform <$> getsYesod appJobCtl
liftIO . atomically $ writeTMChan chan cmd
queueJob :: Job -> YesodDB UniWorX QueuedJobId

View File

@ -21,6 +21,9 @@ data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notifica
data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId }
deriving (Eq, Ord, Show, Read, Generic, Typeable)
instance Hashable Job
instance Hashable Notification
deriveJSON defaultOptions
{ constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel
, fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel
@ -39,3 +42,5 @@ deriveJSON defaultOptions
data JobCtl = JobCtlFlush
| JobCtlPerform QueuedJobId
deriving (Eq, Ord, Read, Show, Generic, Typeable)
instance Hashable JobCtl

View File

@ -69,6 +69,8 @@ import qualified Data.Text as Text
import qualified Data.Foldable as Foldable
import Data.Hashable
import qualified Data.Text.Lazy as LT
import qualified Data.ByteString.Lazy as LBS
@ -127,12 +129,14 @@ instance Monoid (MailSmtpData) where
mappend = mappenddefault
newtype MailLanguages = MailLanguages { mailLanguages :: [Lang] }
deriving (Eq, Ord, Show, Read)
deriving (Eq, Ord, Show, Read, Generic, Typeable)
deriving newtype (FromJSON, ToJSON)
instance Default MailLanguages where
def = MailLanguages []
instance Hashable MailLanguages
class (MonadHandler m, MonadState Mail m) => MonadMail m where
askMailLanguages :: m MailLanguages
tellMailSmtpData :: MailSmtpData -> m ()
@ -206,17 +210,19 @@ defMailT ls (MailT mail) = do
(ret, mail, smtpData) <- runRWST mail ls (emptyMail fromAddress)
mail' <- liftIO $ LBS.toStrict <$> renderMail' mail
$logDebugS "Mail" $ "Rendered mail:\n" <> decodeUtf8 mail'
$logInfoS "Mail" $ "Submitting email: " <> tshow smtpData
ret <$ case smtpData of
MailSmtpData{ smtpEnvelopeFrom = Last Nothing } -> throwM MailNoSenderSpecified
MailSmtpData{ smtpRecipients }
| Set.null smtpRecipients -> throwM MailNoRecipientsSpecified
MailSmtpData{ smtpEnvelopeFrom = Last (Just (unpack -> returnPath))
, smtpRecipients = (map unpack . toList -> recipients)
} -> mailSmtp $ liftIO . SMTP.sendMail
returnPath
recipients
mail'
} -> mailSmtp $ \conn -> do
$logInfoS "Mail" $ "Submitting email: " <> tshow smtpData
liftIO $ SMTP.sendMail
returnPath
recipients
mail'
conn
data PrioritisedAlternatives m = PrioritisedAlternatives

View File

@ -5,7 +5,7 @@
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE DeriveGeneric, DeriveDataTypeable, GeneralizedNewtypeDeriving #-}
{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses, FlexibleContexts, UndecidableInstances #-}
{-# LANGUAGE ViewPatterns #-}
{-- # LANGUAGE ExistentialQuantification #-} -- for DA type
{-# OPTIONS_GHC -fno-warn-orphans #-} -- for instance PathPiece (CI Text)
@ -453,6 +453,10 @@ deriveJSON defaultOptions
derivePersistFieldJSON ''NotificationSettings
instance ToBackendKey SqlBackend record => Hashable (Key record) where
hashWithSalt s key = s `hashWithSalt` fromSqlKey key
-- Type synonyms
type Email = Text

View File

@ -82,6 +82,7 @@ data AppSettings = AppSettings
, appMailFrom :: Address
, appMailObjectDomain :: Text
, appMailVerp :: VerpMode
, appJobWorkers :: Int
, appDetailedRequestLogging :: Bool
-- ^ Use detailed request logging system
@ -270,6 +271,8 @@ instance FromJSON AppSettings where
appMailObjectDomain <- o .: "mail-object-domain"
appMailVerp <- o .: "mail-verp"
appJobWorkers <- o .: "job-workers"
appDetailedRequestLogging <- o .:? "detailed-logging" .!= defaultDev
appShouldLogAll <- o .:? "should-log-all" .!= defaultDev
appMinimumLogLevel <- o .: "minimum-log-level"