83 lines
3.4 KiB
Haskell
83 lines
3.4 KiB
Haskell
{-# LANGUAGE NoImplicitPrelude
|
|
, RecordWildCards
|
|
, TemplateHaskell
|
|
, OverloadedStrings
|
|
, FlexibleContexts
|
|
, ViewPatterns
|
|
, TypeFamilies
|
|
, DeriveGeneric
|
|
, DeriveDataTypeable
|
|
#-}
|
|
|
|
module Notifications
|
|
( handleNotifications
|
|
) where
|
|
|
|
import Import
|
|
|
|
import Data.Conduit.TMChan
|
|
import qualified Data.Conduit.List as C
|
|
|
|
import Data.Aeson (fromJSON, Result(..))
|
|
import Database.Persist.Sql (rawExecute, fromSqlKey)
|
|
|
|
|
|
data NotificationQueueException = QNInvalid QueuedNotification
|
|
| QNLocked QueuedNotificationId UUID UTCTime
|
|
| QNNonexistant QueuedNotificationId
|
|
deriving (Read, Show, Eq, Generic, Typeable)
|
|
|
|
instance Exception NotificationQueueException
|
|
|
|
|
|
handleNotifications :: UniWorX -> IO ()
|
|
handleNotifications foundation@UniWorX{..} = unsafeHandler foundation . bracket_ logStart logStop . runConduit $ sourceTMChan appNotificationCtl .| handleNotifications'
|
|
where
|
|
logStart = $(logDebugS) "Notifications" "Started"
|
|
logStop = $(logDebugS) "Notifications" "Shutting down"
|
|
|
|
handleNotifications' :: Sink NotificationCtl Handler ()
|
|
handleNotifications' = C.mapM_ $ void . handleAny ($(logErrorS) "Notifications" . tshow) . handleCmd
|
|
where
|
|
handleQueueException :: MonadLogger m => NotificationQueueException -> m ()
|
|
handleQueueException (QNInvalid qn) = $(logWarnS) "Notifications" $ "Invalid QueuedNotification: " ++ tshow qn
|
|
handleQueueException (QNNonexistant qnId) = $(logInfoS) "Notifications" $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey qnId)
|
|
handleQueueException (QNLocked qnId lInstance lTime) = $(logDebugS) "Notifications" $ "Saw locked QueuedNotification: " ++ tshow (qnId, lInstance, lTime)
|
|
|
|
handleCmd NCtlFlush = void . fork . runDB . runConduit $ selectKeys [] [ Asc QueuedNotificationCreated ] .| C.mapM_ cmdSend
|
|
handleCmd (NCtlSend qnId) = handle handleQueueException . (`finally` qnUnlock qnId) $ do
|
|
qn@QueuedNotification{..} <- qnLock qnId
|
|
|
|
let
|
|
content :: Notification
|
|
Success content = fromJSON queuedNotificationContent
|
|
|
|
$(logDebugS) "Notifications" $ "Would send: " <> tshow (queuedNotificationRecipient, content) -- FIXME
|
|
|
|
runDB $ delete qnId
|
|
|
|
qnLock :: QueuedNotificationId -> Handler QueuedNotification
|
|
qnLock qnId = runDB $ do
|
|
rawExecute "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE" []
|
|
qn@QueuedNotification{..} <- maybe (throwM $ QNNonexistant qnId) return =<< get qnId
|
|
maybe (return ()) throwM $ QNLocked <$> pure qnId <*> queuedNotificationLockInstance <*> queuedNotificationLockTime
|
|
unless ((fromJSON queuedNotificationContent :: Result Notification) /= mempty) . throwM $ QNInvalid qn
|
|
instanceID <- getsYesod appInstanceID
|
|
now <- liftIO getCurrentTime
|
|
updateGet qnId [ QueuedNotificationLockInstance =. Just instanceID
|
|
, QueuedNotificationLockTime =. Just now
|
|
]
|
|
|
|
qnUnlock :: QueuedNotificationId -> Handler ()
|
|
qnUnlock qnId = runDB $ update qnId [ QueuedNotificationLockInstance =. Nothing
|
|
, QueuedNotificationLockTime =. Nothing
|
|
]
|
|
|
|
|
|
cmdSend :: ( MonadHandler m
|
|
, HandlerSite m ~ UniWorX
|
|
) => QueuedNotificationId -> m ()
|
|
cmdSend (NCtlSend -> cmd) = do
|
|
chan <- getsYesod appNotificationCtl
|
|
liftIO . atomically $ writeTMChan chan cmd
|