{-# LANGUAGE NoImplicitPrelude , RecordWildCards , TemplateHaskell , OverloadedStrings , FlexibleContexts , ViewPatterns , TypeFamilies , DeriveGeneric , DeriveDataTypeable , QuasiQuotes #-} module Jobs ( module Jobs.Types , writeJobCtl , queueJob , handleJobs ) where import Import hiding ((.=)) import Jobs.Types import Data.Conduit.TMChan import qualified Data.Conduit.List as C import qualified Data.Text.Lazy as LT import Data.Aeson (fromJSON, toJSON) import qualified Data.Aeson as Aeson import Database.Persist.Sql (executeQQ, fromSqlKey, transactionSave) import Data.Monoid (Last(..)) import Control.Monad.Trans.Writer (WriterT(..), execWriterT) import Utils.Lens import Control.Monad.Random (evalRand, uniform, mkStdGen) data JobQueueException = JInvalid QueuedJobId QueuedJob | JLocked QueuedJobId InstanceId UTCTime | JNonexistant QueuedJobId deriving (Read, Show, Eq, Generic, Typeable) instance Exception JobQueueException handleJobs :: MonadIO m => [TMChan JobCtl] -> UniWorX -> m () -- | Read control commands from `appJobCtl` and address them as they come in -- -- Uses `unsafeHandler`, as per documentation all HTTP-related fields of state/environment are meaningless placeholders. -- Handling commands in `HandlerT` provides us with the facilities to render urls, unifies logging, provides a value of the foundation type, ... handleJobs recvChans foundation@UniWorX{..} = liftIO . forM_ (zip [1..] recvChans) $ \(n, chan) -> let logStart = $logDebugS ("Jobs #" <> tshow n) "Starting" logStop = $logDebugS ("Jobs #" <> tshow n) "Stopping" in void . fork . unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan chan .| handleJobs' n handleJobs' :: Int -> Sink JobCtl Handler () handleJobs' wNum = C.mapM_ $ void . handleAny ($logErrorS logIdent . tshow) . handleCmd where logIdent = "Jobs #" <> tshow wNum handleQueueException :: MonadLogger m => JobQueueException -> m () handleQueueException (JInvalid jId j) = $logWarnS logIdent $ "Invalid QueuedJob (#" ++ tshow (fromSqlKey jId) ++ "): " ++ tshow j handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) handleCmd JobCtlFlush = void . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (writeJobCtl . JobCtlPerform) handleCmd (JobCtlPerform jId) = handle handleQueueException . jLocked jId $ \j@QueuedJob{..} -> do content <- case fromJSON queuedJobContent of Aeson.Success c -> return c Aeson.Error t -> do $logErrorS logIdent $ "Aeson decoding error: " <> pack t throwM $ JInvalid jId j $logDebugS logIdent . LT.toStrict . decodeUtf8 $ Aeson.encode content Last jobDone <- execWriterT $ performJob content when (fromMaybe True jobDone) $ runDB $ delete jId jLocked :: QueuedJobId -> (QueuedJob -> Handler a) -> Handler a jLocked jId act = do hasLock <- liftIO $ newTVarIO False let lock = runDB . setSerializable $ do QueuedJob{..} <- maybe (throwM $ JNonexistant jId) return =<< get jId maybe (return ()) throwM $ JLocked <$> pure jId <*> queuedJobLockInstance <*> queuedJobLockTime instanceID <- getsYesod appInstanceID now <- liftIO getCurrentTime val <- updateGet jId [ QueuedJobLockInstance =. Just instanceID , QueuedJobLockTime =. Just now ] liftIO . atomically $ writeTVar hasLock True return val unlock = whenM (liftIO . atomically $ readTVar hasLock) $ runDB . setSerializable $ update jId [ QueuedJobLockInstance =. Nothing , QueuedJobLockTime =. Nothing ] bracket lock (const unlock) act writeJobCtl :: (MonadHandler m, HandlerSite m ~ UniWorX) => JobCtl -> m () writeJobCtl cmd = do tid <- liftIO myThreadId chan <- flip evalRand (mkStdGen (hash tid `hashWithSalt` cmd)) . uniform <$> getsYesod appJobCtl liftIO . atomically $ writeTMChan chan cmd queueJob :: Job -> YesodDB UniWorX QueuedJobId queueJob job = do jId <- setSerializable $ do now <- liftIO getCurrentTime self <- getsYesod appInstanceID insert QueuedJob { queuedJobContent = toJSON job , queuedJobCreationInstance = self , queuedJobCreationTime = now , queuedJobLockInstance = Nothing , queuedJobLockTime = Nothing } writeJobCtl $ JobCtlPerform jId -- FIXME: Should do fancy load balancing across instances (or something) return jId setSerializable :: DB a -> DB a setSerializable act = do transactionSave [executeQQ|SET TRANSACTION ISOLATION LEVEL SERIALIZABLE|] act <* transactionSave performJob :: Job -> WriterT (Last Bool) (HandlerT UniWorX IO) () performJob JobSendNotification{ jNotification = NotificationSubmissionRated{..}, .. } = do $logDebugS "Jobs" "NotificationSubmissionRated" fail "NotificationSubmissionRated not implemented yet" -- TODO performJob JobSendTestEmail{..} = do $logInfoS "Jobs" $ "Sending test-email to " <> jEmail mailT jLanguages $ do _mailTo .= [Address Nothing jEmail] setSubjectI MsgMailTestSubject addPart $ \(MsgRenderer mr) -> mr MsgMailTestContent