diff --git a/models b/models index b36d6f93e..d5b82b764 100644 --- a/models +++ b/models @@ -223,7 +223,8 @@ Exam -- By default this file is used in Model.hs (which is imported by Foundation.hs) QueuedJob content Value - created UTCTime - lockInstance UUID Maybe + creationInstance InstanceId + creationTime UTCTime + lockInstance InstanceId Maybe lockTime UTCTime Maybe deriving Eq Read Show Generic Typeable diff --git a/package.yaml b/package.yaml index 3b5db7a49..6b28933e1 100644 --- a/package.yaml +++ b/package.yaml @@ -77,6 +77,8 @@ dependencies: - parsec - uuid - exceptions +- stm +- stm-chans - stm-conduit - lens - MonadRandom diff --git a/src/Foundation.hs b/src/Foundation.hs index 86e90471b..fc6816487 100644 --- a/src/Foundation.hs +++ b/src/Foundation.hs @@ -20,12 +20,12 @@ import Database.Persist.Sql (ConnectionPool, runSqlPool) import Text.Hamlet (hamletFile) import Text.Jasmine (minifym) --- Used only when in "auth-dummy-login" setting is enabled. import Yesod.Auth.Message import Yesod.Auth.Dummy import Auth.LDAP import Auth.PWHash import Auth.Dummy +import Jobs.Types import qualified Network.Wai as W (requestMethod, pathInfo) @@ -115,14 +115,10 @@ data UniWorX = UniWorX , appHttpManager :: Manager , appLogger :: Logger , appCryptoIDKey :: CryptoIDKey - , appInstanceID :: UUID - , appJobCtl :: TMChan JobCtl + , appInstanceID :: InstanceId + , appJobCtl :: TMChan JobCtl } -data JobCtl = NCtlFlush - | NCtlPerform QueuedJobId - deriving (Eq, Ord, Read, Show) - -- This is where we define all of the routes in our application. For a full -- explanation of the syntax, please see: -- http://www.yesodweb.com/book/routing-and-handlers diff --git a/src/Import.hs b/src/Import.hs index cdb0ec16f..a10200156 100644 --- a/src/Import.hs +++ b/src/Import.hs @@ -4,4 +4,3 @@ module Import import Foundation as Import import Import.NoFoundation as Import - diff --git a/src/Jobs.hs b/src/Jobs.hs index e82ad1060..494b83c2f 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -7,50 +7,41 @@ , TypeFamilies , DeriveGeneric , DeriveDataTypeable + , QuasiQuotes #-} module Jobs - ( handleJobs - , Job(..), Notification(..) + ( module Jobs.Types + , writeJobCtl + , queueJob + , handleJobs ) where import Import +import Jobs.Types + import Data.Conduit.TMChan import qualified Data.Conduit.List as C -import Data.Aeson (fromJSON, Result(..), defaultOptions, Options(..)) +import Data.Aeson (fromJSON, toJSON) import qualified Data.Aeson as Aeson -import Data.Aeson.TH (deriveJSON) -import Database.Persist.Sql (rawExecute, fromSqlKey) +import Database.Persist.Sql (executeQQ, fromSqlKey) -data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } - deriving (Eq, Ord, Show, Read) -data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId, nTimestamp :: UTCTime } - deriving (Eq, Ord, Show, Read) - -deriveJSON defaultOptions - { constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel - , fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel - , tagSingleConstructors = True - } ''Job - -deriveJSON defaultOptions - { constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel - , fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel - , tagSingleConstructors = True - } ''Notification - -data JobQueueException = JInvalid QueuedJob - | JLocked QueuedJobId UUID UTCTime +data JobQueueException = JInvalid QueuedJobId QueuedJob + | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException - + handleJobs :: UniWorX -> IO () +-- | Read control commands from `appJobCtl` and address them as they come in +-- +-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. +-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... handleJobs foundation@UniWorX{..} = unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan appJobCtl .| handleJobs' where logStart = $(logDebugS) "Jobs" "Started" @@ -60,14 +51,12 @@ handleJobs' :: Sink JobCtl Handler () handleJobs' = C.mapM_ $ void . handleAny ($(logErrorS) "Jobs" . tshow) . handleCmd where handleQueueException :: MonadLogger m => JobQueueException -> m () - handleQueueException (JInvalid j) = $(logWarnS) "Jobs" $ "Invalid QueuedJob: " ++ tshow j + handleQueueException (JInvalid jId j) = $(logWarnS) "Jobs" $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $(logInfoS) "Jobs" $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) - handleQueueException (JLocked jId lInstance lTime) = $(logDebugS) "Jobs" $ "Saw locked QueuedJob: " ++ tshow (jId, lInstance, lTime) + handleQueueException (JLocked jId lInstance lTime) = $(logDebugS) "Jobs" $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) - handleCmd NCtlFlush = void . fork . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreated ] .| C.mapM_ cmdPerform - handleCmd (NCtlPerform jId) = handle handleQueueException . (`finally` jUnlock jId) $ do - j@QueuedJob{..} <- jLock jId - + handleCmd JobCtlFlush = void . fork . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) + handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \QueuedJob{..} -> do let content :: Job Aeson.Success content = fromJSON queuedJobContent @@ -76,29 +65,51 @@ handleJobs' = C.mapM_ $ void . handleAny ($(logErrorS) "Jobs" . tshow) . handleC runDB $ delete jId -jLock :: QueuedJobId -> Handler QueuedJob -jLock jId = runDB $ do - rawExecute "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE" [] - j@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId - maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime - let isSuccess (Aeson.Success _) = True - isSuccess _ = False - unless (isSuccess (fromJSON queuedJobContent :: Result Job)) . throwM $ JInvalid j - instanceID <- getsYesod appInstanceID - now <- liftIO getCurrentTime - updateGet jId [ QueuedJobLockInstance =. Just instanceID - , QueuedJobLockTime =. Just now - ] - -jUnlock :: QueuedJobId -> Handler () -jUnlock jId = runDB $ update jId [ QueuedJobLockInstance =. Nothing +jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a +jLocked jId act = do + hasLock <- liftIO $ newTVarIO False + val <- runDB $ do + [executeQQ| + SET TRANSACTION ISOLATION LEVEL SERIALIZABLE + |] + j@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId + maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime + case fromJSON queuedJobContent :: Aeson.Result Job of + Aeson.Success _ -> return () + Aeson.Error t -> do + $logErrorS "Jobs" $ "Aeson decoding error: " <> pack t + throwM $ JInvalid jId j + instanceID <- getsYesod appInstanceID + now <- liftIO getCurrentTime + val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID + , QueuedJobLockTime =. Just now + ] + liftIO . atomically $ writeTVar hasLock True + return val + act val `finally` whenM (liftIO . atomically $ readTVar hasLock) jUnlock + where + jUnlock :: Handler () + jUnlock = runDB $ update jId [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] -cmdPerform :: ( MonadHandler m - , HandlerSite m ~ UniWorX - ) => QueuedJobId -> m () -cmdPerform (NCtlPerform -> cmd) = do - chan <- getsYesod appJobCtl +writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m () +writeJobCtl cmd = do + chan <- getsYesod appJobCtl liftIO . atomically $ writeTMChan chan cmd + +queueJob :: Job -> YesodDB UniWorX QueuedJobId +queueJob job = do + now <- liftIO getCurrentTime + self <- getsYesod appInstanceID + jId <- insert QueuedJob + { queuedJobContent = toJSON job + , queuedJobCreationInstance = self + , queuedJobCreationTime = now + , queuedJobLockInstance = Nothing + , queuedJobLockTime = Nothing + } + writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) + return jId + diff --git a/src/Jobs/Types.hs b/src/Jobs/Types.hs new file mode 100644 index 000000000..00621ae0a --- /dev/null +++ b/src/Jobs/Types.hs @@ -0,0 +1,38 @@ +{-# LANGUAGE TemplateHaskell + , NoImplicitPrelude + #-} + +module Jobs.Types + ( Job(..), Notification(..) + , JobCtl(..) + ) where + +import Import.NoFoundation + +import Data.Aeson (defaultOptions, Options(..), SumEncoding(..)) +import Data.Aeson.TH (deriveJSON) + + +data Job = JobSendNotification { jRecipient :: UserId, jNotification :: Notification } + deriving (Eq, Ord, Show, Read) +data Notification = NotificationSubmissionRated { nSubmission :: SubmissionId, nTimestamp :: UTCTime } + deriving (Eq, Ord, Show, Read) + +deriveJSON defaultOptions + { constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel + , fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel + , tagSingleConstructors = True + , sumEncoding = TaggedObject "job" "data" + } ''Job + +deriveJSON defaultOptions + { constructorTagModifier = intercalate "-" . map toLower . drop 1 . splitCamel + , fieldLabelModifier = intercalate "-" . map toLower . drop 1 . splitCamel + , tagSingleConstructors = True + , sumEncoding = TaggedObject "notification" "data" + } ''Notification + + +data JobCtl = JobCtlFlush + | JobCtlPerform QueuedJobId + deriving (Eq, Ord, Read, Show) diff --git a/src/Model.hs b/src/Model.hs index 55fcfb78c..a6d297443 100644 --- a/src/Model.hs +++ b/src/Model.hs @@ -24,7 +24,6 @@ import Database.Persist.Quasi -- import Data.ByteString import Model.Types -import Data.UUID import Data.Aeson (Value) import Data.Aeson.TH (deriveJSON, defaultOptions) diff --git a/src/Model/Types.hs b/src/Model/Types.hs index 8acb5c58b..cc4861626 100644 --- a/src/Model/Types.hs +++ b/src/Model/Types.hs @@ -453,3 +453,4 @@ type SheetName = CI Text type UserEmail = CI Text type PWHashAlgorithm = ByteString -> PWStore.Salt -> Int -> ByteString +type InstanceId = UUID diff --git a/src/Utils/Message.hs b/src/Utils/Message.hs index 438d21932..c6a518fae 100644 --- a/src/Utils/Message.hs +++ b/src/Utils/Message.hs @@ -1,5 +1,6 @@ {-# LANGUAGE FlexibleInstances, FlexibleContexts #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE NoImplicitPrelude #-} module Utils.Message @@ -13,7 +14,7 @@ import Data.Universe import Utils.PathPiece (finiteFromPathPiece, nullaryToPathPiece) import qualified ClassyPrelude.Yesod (addMessage, addMessageI) -import ClassyPrelude.Yesod (PathPiece(..),MonadHandler,HandlerSite,RenderMessage,Html) +import ClassyPrelude.Yesod hiding (addMessage, addMessageI) data MessageClass = Error | Warning | Info | Success diff --git a/src/Utils/TH.hs b/src/Utils/TH.hs index 45bc84c7e..3f5579269 100644 --- a/src/Utils/TH.hs +++ b/src/Utils/TH.hs @@ -1,11 +1,13 @@ {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE NoImplicitPrelude #-} module Utils.TH where -- Common Utility Functions that require TemplateHaskell -- import Data.Char +import Prelude import Language.Haskell.TH -- import Control.Monad -- import Control.Monad.Trans.Class