diff --git a/config/settings.yml b/config/settings.yml index 80eae5bb0..71554542b 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -123,7 +123,7 @@ widget-memcached: host: "_env:WIDGET_MEMCACHED_HOST:" port: "_env:WIDGET_MEMCACHED_PORT:11211" auth: [] - limit: "_env:WIDGET_MEMCACHED_LIMIT:10" + limit: "_env:WIDGET_MEMCACHED_LIMIT:1024" timeout: "_env:WIDGET_MEMCACHED_TIMEOUT:20" base-url: "_env:WIDGET_MEMCACHED_ROOT:" expiration: "_env:WIDGET_MEMCACHED_EXPIRATION:3600" @@ -132,10 +132,18 @@ session-memcached: host: "_env:SESSION_MEMCACHED_HOST:" port: "_env:SESSION_MEMCACHED_PORT:11211" auth: [] - limit: "_env:SESSION_MEMCACHED_LIMIT:10" + limit: "_env:SESSION_MEMCACHED_LIMIT:1024" timeout: "_env:SESSION_MEMCACHED_TIMEOUT:20" expiration: "_env:SESSION_MEMCACHED_EXPIRATION:28807" +memcached: + host: "_env:MEMCACHED_HOST:" + port: "_env:MEMCACHED_PORT:11211" + auth: [] + limit: "_env:MEMCACHED_LIMIT:1024" + timeout: "_env:MEMCACHED_TIMEOUT:20" + expiration: "_env:MEMCACHED_EXPIRATION:300" + server-sessions: idle-timeout: 28807 absolute-timeout: 604801 diff --git a/package.yaml b/package.yaml index 40d9a3702..35fc11517 100644 --- a/package.yaml +++ b/package.yaml @@ -143,6 +143,8 @@ dependencies: - rfc5051 - unidecode - pandoc + - token-bucket + - async other-extensions: - GeneralizedNewtypeDeriving diff --git a/src/Application.hs b/src/Application.hs index 3be9cf038..69c254e9c 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -175,7 +175,7 @@ makeFoundation appSettings'@AppSettings{..} = do -- logging function. To get out of this loop, we initially create a -- temporary foundation without a real connection pool, get a log function -- from there, and then create the real foundation. - let mkFoundation appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID = UniWorX {..} + let mkFoundation appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached = UniWorX {..} -- The UniWorX {..} syntax is an example of record wild cards. For more -- information, see: -- https://ocharles.org.uk/blog/posts/2014-12-04-record-wildcards.html @@ -189,6 +189,7 @@ makeFoundation appSettings'@AppSettings{..} = do (error "widgetMemcached forced in tempFoundation") (error "JSONWebKeySet forced in tempFoundation") (error "ClusterID forced in tempFoundation") + (error "memcached forced in tempFoundation") runAppLoggingT tempFoundation $ do $logInfoS "InstanceID" $ UUID.toText appInstanceID @@ -226,9 +227,15 @@ makeFoundation appSettings'@AppSettings{..} = do appJSONWebKeySet <- clusterSetting (Proxy :: Proxy 'ClusterJSONWebKeySet) `runSqlPool` sqlPool appClusterID <- clusterSetting (Proxy :: Proxy 'ClusterId) `runSqlPool` sqlPool + appMemcached <- for appMemcachedConf $ \memcachedConf -> do + $logDebugS "setup" "Memcached" + memcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterMemcachedKey) `runSqlPool` sqlPool + memcached <- createMemcached memcachedConf + return (memcachedKey, memcached) + appSessionStore <- mkSessionStore appSettings' sqlPool `runSqlPool` sqlPool - let foundation = mkFoundation sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID + let foundation = mkFoundation sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached -- Return the foundation $logDebugS "setup" "Done" @@ -490,10 +497,10 @@ appMain = runResourceT $ do case watchdogMicroSec of Just wInterval | maybe True (== myProcessID) watchdogProcess - -> let notifyWatchdog :: forall a. IO a - notifyWatchdog = runAppLoggingT foundation $ go Nothing + -> let notifyWatchdog :: forall a m'. ( MonadLogger m', MonadIO m') => m' a + notifyWatchdog = go Nothing where - go :: Maybe (Set (UTCTime, HealthReport)) -> LoggingT IO a + go :: Maybe (Set (UTCTime, HealthReport)) -> m' a go pResults = do let delay = floor $ wInterval % 2 d <- liftIO $ newDelay delay diff --git a/src/Foundation/Type.hs b/src/Foundation/Type.hs index ade7a5faa..235b46c20 100644 --- a/src/Foundation/Type.hs +++ b/src/Foundation/Type.hs @@ -14,6 +14,7 @@ import Jobs.Types import Yesod.Core.Types (Logger) import qualified Crypto.Saltine.Core.SecretBox as SecretBox +import qualified Crypto.Saltine.Core.AEAD as AEAD import qualified Jose.Jwk as Jose import qualified Database.Memcached.Binary.IO as Memcached @@ -49,6 +50,7 @@ data UniWorX = UniWorX , appSecretBoxKey :: SecretBox.Key , appJSONWebKeySet :: Jose.JwkSet , appHealthReport :: TVar (Set (UTCTime, HealthReport)) + , appMemcached :: Maybe (AEAD.Key, Memcached.Connection) } makeLenses_ ''UniWorX diff --git a/src/Handler/Utils.hs b/src/Handler/Utils.hs index 8ba6f99ce..5b4e5f5c0 100644 --- a/src/Handler/Utils.hs +++ b/src/Handler/Utils.hs @@ -24,6 +24,7 @@ import Handler.Utils.I18n as Handler.Utils import Handler.Utils.Widgets as Handler.Utils import Handler.Utils.Database as Handler.Utils import Handler.Utils.Occurrences as Handler.Utils +import Handler.Utils.Memcached as Handler.Utils import Control.Monad.Logger diff --git a/src/Handler/Utils/Memcached.hs b/src/Handler/Utils/Memcached.hs new file mode 100644 index 000000000..5701fbe99 --- /dev/null +++ b/src/Handler/Utils/Memcached.hs @@ -0,0 +1,526 @@ +module Handler.Utils.Memcached + ( memcached, memcachedBy + , memcachedHere, memcachedByHere + , memcachedSet, memcachedGet + , memcachedByGet, memcachedBySet + , memcachedTimeout, memcachedTimeoutBy + , memcachedTimeoutHere, memcachedTimeoutByHere + , memcachedLimited, memcachedLimitedKey, memcachedLimitedBy, memcachedLimitedKeyBy + , memcachedLimitedHere, memcachedLimitedKeyHere, memcachedLimitedByHere, memcachedLimitedKeyByHere + , memcachedLimitedTimeout, memcachedLimitedKeyTimeout, memcachedLimitedTimeoutBy, memcachedLimitedKeyTimeoutBy + , memcachedLimitedTimeoutHere, memcachedLimitedKeyTimeoutHere, memcachedLimitedTimeoutByHere, memcachedLimitedKeyTimeoutByHere + , Expiry + , MemcachedException(..), AsyncTimeoutException(..) + ) where + +import Import hiding (utc, exp) + + +import qualified Database.Memcached.Binary.IO as Memcached +import Data.Bits (Bits(zeroBits), toIntegralSized) + +import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime) + +import qualified Data.Binary as Binary + +import qualified Crypto.MAC.KMAC as KMAC +import Crypto.Hash.Algorithms (SHAKE256) + +import qualified Data.ByteString.Lazy as LBS +import qualified Data.ByteArray as BA + +import Language.Haskell.TH + +import Data.Typeable (typeRep) +import Type.Reflection (typeOf, TypeRep) +import qualified Type.Reflection as Refl (typeRep) +import Data.Type.Equality (TestEquality(..)) + +import qualified Data.HashMap.Strict as HashMap + +import Control.Concurrent.TokenBucket (TokenBucket, newTokenBucket, tokenBucketTryAlloc) + +import System.IO.Unsafe (unsafePerformIO) + +import Control.Concurrent.STM.Delay + +import qualified Crypto.Saltine.Class as Saltine +import qualified Crypto.Saltine.Internal.ByteSizes as Saltine +import qualified Crypto.Saltine.Core.AEAD as AEAD + +import qualified Control.Monad.State.Class as State + + +type Expiry = (Either UTCTime DiffTime) + +_MemcachedExpiry :: Prism' Expiry Memcached.Expiry +_MemcachedExpiry = prism' fromExpiry toExpiry + where toExpiry (Left utc) + | posix > 2592000 = toIntegralSized posix + | otherwise = Nothing + where posix :: Integer + posix = ceiling $ utcTimeToPOSIXSeconds utc + toExpiry (Right dTime) + | 0 < dTime + , dTime <= 2592000 + = Just $ ceiling dTime + | otherwise + = Nothing + + fromExpiry n + | n <= 2592000 + = Right $ fromIntegral n + | otherwise + = Left . posixSecondsToUTCTime $ fromIntegral n + + +data MemcachedException = MemcachedException Memcached.MemcachedException + | MemcachedInvalidExpiry Expiry + deriving (Show, Typeable) + deriving anyclass (Exception) + + +memcachedKey :: ( Typeable a + , Binary k + ) + => AEAD.Key -> Proxy a -> k -> ByteString +memcachedKey (Saltine.encode -> kmacKey) p k = Binary.encode k + & KMAC.finalize . KMAC.updates (KMAC.initialize @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey) . LBS.toChunks + & BA.convert + +memcachedByGet :: forall a k m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , Typeable a, Binary a + , Binary k + ) + => k -> m (Maybe a) +memcachedByGet k = runMaybeT $ do + (aeadKey, conn) <- MaybeT $ getsYesod appMemcached + let cKey = memcachedKey aeadKey (Proxy @a) k + + encVal <- fmap toStrict . hoist liftIO . catchIfMaybeT Memcached.isItemNotStored $ Memcached.get_ cKey conn + + guard $ length encVal >= Saltine.secretBoxNonce + Saltine.secretBoxMac + let (nonceBS, encrypted) = splitAt Saltine.secretBoxNonce encVal + nonce <- hoistMaybe $ Saltine.decode nonceBS + decrypted <- hoistMaybe $ AEAD.aeadOpen aeadKey nonce encrypted cKey + + case Binary.decodeOrFail $ fromStrict decrypted of + Right (unconsumed, _, v) + | null unconsumed -> return v + _other -> mzero + +memcachedBySet :: forall a k m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + , Binary k + ) + => Maybe Expiry -> k -> a -> m () +memcachedBySet mExp k v = do + mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry + mConn <- getsYesod appMemcached + for_ mConn $ \(aeadKey, conn) -> do + nonce <- liftIO AEAD.newNonce + let cKey = memcachedKey aeadKey (Proxy @a) k + encVal = Saltine.encode nonce <> AEAD.aead aeadKey nonce (toStrict $ Binary.encode v) cKey + liftIO $ Memcached.add zeroBits (fromMaybe zeroBits mExp') cKey (fromStrict encVal) conn + + +newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a } + deriving (Typeable) + deriving newtype (Eq, Ord, Show, Binary) + +memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX + , Typeable a, Binary a + ) + => m (Maybe a) +memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet () + +memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + ) + => Maybe Expiry -> a -> m () +memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed + + +memcachedWith :: Monad m + => (m (Maybe a), a -> m ()) -> m a -> m a +memcachedWith (doGet, doSet) act = do + pRes <- doGet + maybe id (const . return) pRes $ do + res <- act + doSet res + return res + +memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + ) + => Maybe Expiry -> m a -> m a +memcached mExp = memcachedWith (memcachedGet, memcachedSet mExp) + +memcachedBy :: ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + , Binary k + ) + => Maybe Expiry -> k -> m a -> m a +memcachedBy mExp k = memcachedWith (memcachedByGet k, memcachedBySet mExp k) + + +newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a } + deriving (Typeable) + deriving newtype (Eq, Ord, Show, Binary) + +memcachedHere :: Q Exp +memcachedHere = do + loc <- location + [e| \mExp -> fmap unMemcachedUnkeyedLoc . memcachedBy mExp loc . fmap MemcachedUnkeyedLoc |] + +newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a } + deriving (Typeable) + deriving newtype (Eq, Ord, Show, Binary) + +memcachedByHere :: Q Exp +memcachedByHere = do + loc <- location + [e| \mExp k -> fmap unMemcachedKeyedLoc . memcachedBy mExp (loc, k) . fmap MemcachedKeyedLoc |] + + +data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a + +instance Hashable HashableDynamic where + hashWithSalt s (HashableDynamic tRep v) = s `hashWithSalt` tRep `hashWithSalt` v +instance Eq HashableDynamic where + (HashableDynamic tRep v) == (HashableDynamic tRep' v') = case testEquality tRep tRep' of + Just Refl -> v == v' + Nothing -> False + +hashableDynamic :: forall a. + ( Typeable a, Hashable a, Eq a ) + => a -> HashableDynamic +hashableDynamic v = HashableDynamic (typeOf v) v + +memcachedLimit :: TVar (HashMap HashableDynamic TokenBucket) +memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty +{-# NOINLINE memcachedLimit #-} + +memcachedLimitedWith :: ( MonadIO m + , Typeable k', Hashable k', Eq k' + ) + => (m (Maybe a), a -> m ()) + -> (m a -> MaybeT m a) -- ^ Wrap execution on cache miss + -> k' -- ^ Key for limiting + -> Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> m a + -> m (Maybe a) +memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate tokens act = runMaybeT $ do + pRes <- lift doGet + maybe id (const . return) pRes $ do + mBucket <- fmap (HashMap.lookup lK) . liftIO $ readTVarIO memcachedLimit + bucket <- case mBucket of + Just bucket -> return bucket + Nothing -> liftIO $ do + bucket <- newTokenBucket + atomically $ do + hm <- readTVar memcachedLimit + let hm' = HashMap.insertWith (flip const) lK bucket hm + writeTVar memcachedLimit $! hm' + return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm' + guardM . liftIO $ tokenBucketTryAlloc bucket burst rate tokens + + liftAct $ do + res <- act + doSet res + return res + +memcachedLimited :: forall a m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + ) + => Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> m a + -> m (Maybe a) +memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift (Proxy @a) burst rate tokens + +memcachedLimitedKey :: forall a k' m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + , Typeable k', Hashable k', Eq k' + ) + => k' + -> Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> m a + -> m (Maybe a) +memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift lK burst rate tokens + +memcachedLimitedBy :: forall a k m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + , Binary k + ) + => Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> k + -> m a + -> m (Maybe a) +memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift (Proxy @a) burst rate tokens + +memcachedLimitedKeyBy :: forall a k' k m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , Typeable a, Binary a + , Typeable k', Hashable k', Eq k' + , Binary k + ) + => k' + -> Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> k + -> m a + -> m (Maybe a) +memcachedLimitedKeyBy lK burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift lK burst rate tokens + +memcachedLimitedHere :: Q Exp +memcachedLimitedHere = do + loc <- location + [e| \burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedBy burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |] + +memcachedLimitedKeyHere :: Q Exp +memcachedLimitedKeyHere = do + loc <- location + [e| \lK burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |] + +memcachedLimitedByHere :: Q Exp +memcachedLimitedByHere = do + loc <- location + [e| \burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedBy burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |] + +memcachedLimitedKeyByHere :: Q Exp +memcachedLimitedKeyByHere = do + loc <- location + [e| \lK burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |] + + +data AsyncTimeoutException = AsyncTimeoutReturnTypeDoesNotMatchComputationKey + deriving (Show, Typeable) + deriving anyclass (Exception) + +data DynamicAsync = forall a. DynamicAsync !(TypeRep a) !(Async a) + +instance Eq DynamicAsync where + (DynamicAsync tRep v) == (DynamicAsync tRep' v') = case testEquality tRep tRep' of + Just Refl -> v == v' + Nothing -> False + +memcachedAsync :: TVar (HashMap HashableDynamic DynamicAsync) +memcachedAsync = unsafePerformIO . newTVarIO $ HashMap.empty +{-# NOINLINE memcachedAsync #-} + +liftAsyncTimeout :: forall k'' a m. + ( MonadResource m, MonadUnliftIO m + , MonadThrow m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a + ) + => DiffTime + -> k'' + -> m a -> MaybeT m a +liftAsyncTimeout dt (hashableDynamic -> cK) act = do + delay <- liftIO . newDelay . round $ toRational dt / 1e6 + + act' <- lift $ do + existing <- traverse castDynamicAsync . HashMap.lookup cK <=< liftIO $ readTVarIO memcachedAsync + case existing of + Just act' -> return act' + Nothing -> do + startAct <- liftIO newEmptyTMVarIO + act' <- allocateLinkedAsync $ do + atomically $ takeTMVar startAct + act + act'' <- atomically $ do + hm <- readTVar memcachedAsync + let new = DynamicAsync (Refl.typeRep @a) act' + go mOld = case mOld of + Just old' -> do + old <- castDynamicAsync old' + resolved <- lift $ is _Just <$> pollSTM old + + if | resolved -> return $ Just new + | otherwise -> do + State.put old + return $ Just old' + Nothing -> return $ Just new + + (hm', act'') <- runStateT (HashMap.alterF go cK hm) act' + writeTVar memcachedAsync $! hm' + return act'' + if | act' == act'' -> atomically $ putTMVar startAct () + | otherwise -> cancel act' + return act'' + + MaybeT . atomically $ (Nothing <$ waitDelay delay) <|> (Just <$> waitSTM act') + where + castDynamicAsync :: forall m'. MonadThrow m' => DynamicAsync -> m' (Async a) + castDynamicAsync (DynamicAsync tRep v) + | Just Refl <- testEquality tRep (Refl.typeRep @a) + = return v + | otherwise + = throwM AsyncTimeoutReturnTypeDoesNotMatchComputationKey + +memcachedTimeoutWith :: ( MonadResource m, MonadUnliftIO m + , MonadThrow m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a + ) + => (m (Maybe a), a -> m ()) -> DiffTime -> k'' -> m a -> m (Maybe a) +memcachedTimeoutWith (doGet, doSet) dt cK act = runMaybeT $ do + pRes <- lift doGet + maybe id (const . return) pRes $ + liftAsyncTimeout dt cK $ do + res <- act + doSet res + return res + +memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , MonadUnliftIO m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a, Binary a + ) + => Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) +memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp) + +memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , MonadUnliftIO m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a, Binary a + , Binary k + ) + => Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) +memcachedTimeoutBy mExp dt cK k = memcachedTimeoutWith (memcachedByGet k, memcachedBySet mExp k) dt cK + +memcachedTimeoutHere :: Q Exp +memcachedTimeoutHere = do + loc <- location + [e| \mExp dt cK -> fmap unMemcachedUnkeyedLoc . memcachedTimeoutBy mExp dt cK loc . fmap MemcachedUnkeyedLoc |] + +memcachedTimeoutByHere :: Q Exp +memcachedTimeoutByHere = do + loc <- location + [e| \mExp dt cK k -> fmap unMemcachedKeyedLoc . memcachedBy mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |] + +memcachedLimitedTimeout :: forall a k'' m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , MonadUnliftIO m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a, Binary a + ) + => Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> DiffTime + -> k'' + -> m a + -> m (Maybe a) +memcachedLimitedTimeout burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens + +memcachedLimitedKeyTimeout :: forall a k' k'' m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , MonadUnliftIO m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a, Binary a + , Typeable k', Hashable k', Eq k' + ) + => k' + -> Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> DiffTime + -> k'' + -> m a + -> m (Maybe a) +memcachedLimitedKeyTimeout lK burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) lK burst rate tokens + +memcachedLimitedTimeoutBy :: forall a k'' k m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , MonadUnliftIO m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a, Binary a + , Binary k + ) + => Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> DiffTime + -> k'' + -> k + -> m a + -> m (Maybe a) +memcachedLimitedTimeoutBy burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens + +memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m. + ( MonadHandler m, HandlerSite m ~ UniWorX + , MonadThrow m + , MonadUnliftIO m + , Typeable k'', Hashable k'', Eq k'' + , Typeable a, Binary a + , Typeable k', Hashable k', Eq k' + , Binary k + ) + => k' + -> Word64 -- ^ burst-size (tokens) + -> Word64 -- ^ avg. inverse rate (usec/token) + -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform + -> Maybe Expiry + -> DiffTime + -> k'' + -> k + -> m a + -> m (Maybe a) +memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) lK burst rate tokens + +memcachedLimitedTimeoutHere :: Q Exp +memcachedLimitedTimeoutHere = do + loc <- location + [e| \burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |] + +memcachedLimitedKeyTimeoutHere :: Q Exp +memcachedLimitedKeyTimeoutHere = do + loc <- location + [e| \lK burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |] + +memcachedLimitedTimeoutByHere :: Q Exp +memcachedLimitedTimeoutByHere = do + loc <- location + [e| \burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |] + +memcachedLimitedKeyTimeoutByHere :: Q Exp +memcachedLimitedKeyTimeoutByHere = do + loc <- location + [e| \lK burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |] + diff --git a/src/Jobs.hs b/src/Jobs.hs index 0daaaadcb..ef140a7ad 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -85,11 +85,9 @@ handleJobs :: ( MonadResource m handleJobs foundation@UniWorX{..} | foundation ^. _appJobWorkers == 0 = return () | otherwise = do - UnliftIO{..} <- askUnliftIO + jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> manageJobPool foundation unmask - jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> unliftIO $ manageJobPool foundation unmask - - jobCron <- allocateLinkedAsync . unliftIO $ manageCrontab foundation + jobCron <- allocateLinkedAsync $ manageCrontab foundation let jobWorkers = Map.empty jobWorkerName = const $ error "Unknown worker" @@ -121,7 +119,7 @@ manageJobPool :: forall m. , MonadUnliftIO m , MonadMask m ) - => UniWorX -> (forall a. IO a -> IO a) -> m () + => UniWorX -> (forall a. m a -> m a) -> m () manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ flip runContT return . forever . join . atomically $ asum [ spawnMissingWorkers @@ -131,9 +129,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ where shutdownOnException :: m a -> m a shutdownOnException act = do - UnliftIO{..} <- askUnliftIO - - actAsync <- allocateLinkedAsyncMasked $ unliftIO act + actAsync <- allocateLinkedAsyncMasked act let handleExc e = do atomically $ do @@ -143,7 +139,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ void $ wait actAsync throwM e - liftIO (unmask $ wait actAsync) `catchAll` handleExc + unmask (wait actAsync) `catchAll` handleExc num :: Int num = fromIntegral $ foundation ^. _appJobWorkers @@ -178,7 +174,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $ $logInfoS logIdent "Started" runConduit $ streamChan .| handleJobs' workerId $logInfoS logIdent "Stopped" - worker <- lift . lift $ allocateLinkedAsync runWorker + worker <- lift . lift . allocateLinkedAsync $ liftIO runWorker tell . Endo $ \cSt -> cSt { jobWorkers = Map.insert worker chan $ jobWorkers cSt diff --git a/src/Jobs/HealthReport.hs b/src/Jobs/HealthReport.hs index 8707ea15a..90ddf8966 100644 --- a/src/Jobs/HealthReport.hs +++ b/src/Jobs/HealthReport.hs @@ -59,6 +59,10 @@ dispatchHealthCheckMatchingClusterConfig ourSetting <- getsYesod appClusterID dbSetting <- clusterSetting @'ClusterId return $ Just ourSetting == dbSetting + clusterSettingMatches ClusterMemcachedKey = do + ourSetting <- getsYesod $ fmap fst . appMemcached + dbSetting <- clusterSetting @'ClusterMemcachedKey + return $ maybe True ((== dbSetting) . Just) ourSetting clusterSetting :: forall key. diff --git a/src/Settings.hs b/src/Settings.hs index c176f2731..6e502928b 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -158,6 +158,8 @@ data AppSettings = AppSettings , appCookieSettings :: RegisteredCookie -> CookieSettings + , appMemcachedConf :: Maybe MemcachedConf + , appInitialInstanceID :: Maybe (Either FilePath UUID) , appRibbon :: Maybe Text } deriving Show @@ -418,6 +420,7 @@ instance FromJSON AppSettings where ] appWidgetMemcachedConf <- assertM validWidgetMemcachedConf <$> o .:? "widget-memcached" appSessionMemcachedConf <- assertM validMemcachedConf <$> o .:? "session-memcached" + appMemcachedConf <- assertM validMemcachedConf <$> o .:? "memcached" appRoot <- o .:? "approot" appHost <- fromString <$> o .: "host" appPort <- o .: "port" diff --git a/src/Settings/Cluster.hs b/src/Settings/Cluster.hs index 25cf564bc..bc72857ea 100644 --- a/src/Settings/Cluster.hs +++ b/src/Settings/Cluster.hs @@ -41,6 +41,7 @@ data ClusterSettingsKey | ClusterSecretBoxKey | ClusterJSONWebKeySet | ClusterId + | ClusterMemcachedKey deriving (Eq, Ord, Enum, Bounded, Show, Read) instance Universe ClusterSettingsKey @@ -122,3 +123,9 @@ instance ClusterSetting 'ClusterId where type ClusterSettingValue 'ClusterId = UUID initClusterSetting _ = liftIO getRandom knownClusterSetting _ = ClusterId + + +instance ClusterSetting 'ClusterMemcachedKey where + type ClusterSettingValue 'ClusterMemcachedKey = AEAD.Key + initClusterSetting _ = liftIO AEAD.newKey + knownClusterSetting _ = ClusterMemcachedKey diff --git a/src/UnliftIO/Async/Utils.hs b/src/UnliftIO/Async/Utils.hs index fb1dbc978..4b775e807 100644 --- a/src/UnliftIO/Async/Utils.hs +++ b/src/UnliftIO/Async/Utils.hs @@ -7,33 +7,34 @@ module UnliftIO.Async.Utils import ClassyPrelude hiding (cancel, async, link) import Control.Lens -import UnliftIO.Async +import qualified UnliftIO.Async as UnliftIO +import qualified Control.Concurrent.Async as A import Control.Monad.Trans.Resource allocateAsync :: forall m a. - MonadResource m - => IO a -> m (Async a) -allocateAsync = fmap (view _2) . flip allocate cancel . liftIO . async + ( MonadUnliftIO m, MonadResource m ) + => m a -> m (Async a) +allocateAsync act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel . A.async $ run act -allocateLinkedAsync :: forall m a. (MonadUnliftIO m, MonadResource m) => IO a -> m (Async a) -allocateLinkedAsync = uncurry (<$) . (id &&& link) <=< allocateAsync +allocateLinkedAsync :: forall m a. (MonadUnliftIO m, MonadResource m) => m a -> m (Async a) +allocateLinkedAsync = uncurry (<$) . (id &&& UnliftIO.link) <=< allocateAsync allocateAsyncWithUnmask :: forall m a. - MonadResource m - => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) -allocateAsyncWithUnmask act = fmap (view _2) . flip allocate cancel . liftIO $ asyncWithUnmask act + ( MonadUnliftIO m, MonadResource m ) + => ((forall b. m b -> m b) -> m a) -> m (Async a) +allocateAsyncWithUnmask act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel $ A.asyncWithUnmask $ \unmask -> run $ act (liftIO . unmask . run) -allocateLinkedAsyncWithUnmask :: forall m a. (MonadUnliftIO m, MonadResource m) => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) -allocateLinkedAsyncWithUnmask act = uncurry (<$) . (id &&& link) =<< allocateAsyncWithUnmask act +allocateLinkedAsyncWithUnmask :: forall m a. (MonadUnliftIO m, MonadResource m) => ((forall b. m b -> m b) -> m a) -> m (Async a) +allocateLinkedAsyncWithUnmask act = uncurry (<$) . (id &&& UnliftIO.link) =<< allocateAsyncWithUnmask act allocateAsyncMasked :: forall m a. - MonadResource m - => IO a -> m (Async a) -allocateAsyncMasked act = fmap (view _2) . flip allocate cancel . liftIO $ asyncWithUnmask (const act) + ( MonadUnliftIO m, MonadResource m ) + => m a -> m (Async a) +allocateAsyncMasked act = allocateAsyncWithUnmask (const act) -allocateLinkedAsyncMasked :: forall m a. (MonadUnliftIO m, MonadResource m) => IO a -> m (Async a) -allocateLinkedAsyncMasked = uncurry (<$) . (id &&& link) <=< allocateAsyncMasked +allocateLinkedAsyncMasked :: forall m a. (MonadUnliftIO m, MonadResource m) => m a -> m (Async a) +allocateLinkedAsyncMasked act = allocateLinkedAsyncWithUnmask (const act) diff --git a/stack.yaml b/stack.yaml index 298cfb02d..2e2664180 100644 --- a/stack.yaml +++ b/stack.yaml @@ -104,5 +104,7 @@ extra-deps: - unidecode-0.1.0.4 + - token-bucket-0.1.0.1 + resolver: lts-15.0 allow-newer: true diff --git a/stack.yaml.lock b/stack.yaml.lock index e8cc7d86d..82d6c2976 100644 --- a/stack.yaml.lock +++ b/stack.yaml.lock @@ -281,6 +281,13 @@ packages: sha256: 4959068a0caf410dd4b8046f0b0138e3cf6471abb0cc865c9993db3b2930d283 original: hackage: unidecode-0.1.0.4 +- completed: + hackage: token-bucket-0.1.0.1@sha256:d8e85f2fc373939975e7ace7907baee177531ab6e43df94e330a2357e64a2d11,1899 + pantry-tree: + size: 399 + sha256: b0b4a08ea1bf76bd108310f64d7f80e0f30b61ddc3d71f6cab7bdce329d2c1fa + original: + hackage: token-bucket-0.1.0.1 snapshots: - completed: size: 488576