fradrive/src/Jobs.hs
2018-10-04 14:53:36 +02:00

138 lines
5.1 KiB
Haskell

{-# LANGUAGE NoImplicitPrelude
, RecordWildCards
, TemplateHaskell
, OverloadedStrings
, FlexibleContexts
, ViewPatterns
, TypeFamilies
, DeriveGeneric
, DeriveDataTypeable
, QuasiQuotes
#-}
module Jobs
( module Jobs.Types
, writeJobCtl
, queueJob
, handleJobs
) where
import Import hiding ((.=))
import Jobs.Types
import Data.Conduit.TMChan
import qualified Data.Conduit.List as C
import qualified Data.Text.Lazy as LT
import Data.Aeson (fromJSON, toJSON)
import qualified Data.Aeson as Aeson
import Database.Persist.Sql (executeQQ, fromSqlKey)
import Data.Monoid (Last(..))
import Control.Monad.Trans.Writer (WriterT(..), execWriterT)
import Utils.Lens
data JobQueueException = JInvalid QueuedJobId QueuedJob
| JLocked QueuedJobId InstanceId UTCTime
| JNonexistant QueuedJobId
deriving (Read, Show, Eq, Generic, Typeable)
instance Exception JobQueueException
handleJobs :: UniWorX -> IO ()
-- | Read control commands from `appJobCtl` and address them as they come in
--
-- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders.
-- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ...
handleJobs foundation@UniWorX{..} = unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan appJobCtl .| handleJobs'
where
logStart = $logDebugS "Jobs" "Started"
logStop = $logDebugS "Jobs" "Shutting down"
handleJobs' :: Sink JobCtl Handler ()
handleJobs' = C.mapM_ $ void . handleAny ($logErrorS "Jobs" . tshow) . handleCmd
where
handleQueueException :: MonadLogger m => JobQueueException -> m ()
handleQueueException (JInvalid jId j) = $logWarnS "Jobs" $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j
handleQueueException (JNonexistant jId) = $logInfoS "Jobs" $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
handleQueueException (JLocked jId lInstance lTime) = $logDebugS "Jobs" $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
handleCmd JobCtlFlush = void . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform)
handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \QueuedJob{..} -> do
let
content :: Job
Aeson.Success content = fromJSON queuedJobContent -- `jLocked` ensures that `queuedJobContent` parses
$logDebugS "Jobs" . LT.toStrict . decodeUtf8 $ Aeson.encode content
Last jobDone <- execWriterT $ performJob content
when (fromMaybe False jobDone) $
runDB $ delete jId
jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a
jLocked jId act = do
hasLock <- liftIO $ newTVarIO False
val <- runDB $ do
setSerializable
j@QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId
maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime
case fromJSON queuedJobContent :: Aeson.Result Job of
Aeson.Success _ -> return ()
Aeson.Error t -> do
$logErrorS "Jobs" $ "Aeson decoding error: " <> pack t
throwM $ JInvalid jId j
instanceID <- getsYesod appInstanceID
now <- liftIO getCurrentTime
val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID
, QueuedJobLockTime =. Just now
]
liftIO . atomically $ writeTVar hasLock True
return val
act val `finally` whenM (liftIO . atomically $ readTVar hasLock) jUnlock
where
jUnlock :: Handler ()
jUnlock = runDB $ do
setSerializable
update jId [ QueuedJobLockInstance =. Nothing
, QueuedJobLockTime =. Nothing
]
setSerializable = [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|]
writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m ()
writeJobCtl cmd = do
chan <- getsYesod appJobCtl
liftIO . atomically $ writeTMChan chan cmd
queueJob :: Job -> YesodDB UniWorX QueuedJobId
queueJob job = do
now <- liftIO getCurrentTime
self <- getsYesod appInstanceID
jId <- insert QueuedJob
{ queuedJobContent = toJSON job
, queuedJobCreationInstance = self
, queuedJobCreationTime = now
, queuedJobLockInstance = Nothing
, queuedJobLockTime = Nothing
}
writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something)
return jId
performJob :: Job -> WriterT (Last Bool) (HandlerT UniWorX IO) ()
performJob JobSendNotification{ jNotification = NotificationSubmissionRated{..}, .. } = do
$logDebugS "Jobs" "NotificationSubmissionRated" -- FIXME
performJob JobSendTestEmail{..} = do
$logInfoS "Jobs" $ "Sending test-email to " <> jEmail
mailT jLanguages $ do
_mailTo .= [Address Nothing jEmail]
setSubjectI MsgMailTestSubject
addPart (($ MsgMailTestContent) :: (UniWorXMessage -> Text) -> Text) -- FIXME