-- SPDX-FileCopyrightText: 2022 Gregor Kleen -- -- SPDX-License-Identifier: AGPL-3.0-or-later {-# LANGUAGE DuplicateRecordFields #-} module Handler.Utils.Memcached ( memcachedAvailable , memcached, memcachedBy , memcachedHere, memcachedByHere , memcachedSet, memcachedGet , memcachedInvalidate, memcachedByInvalidate , manageMemcachedLocalInvalidations , memcachedByGet, memcachedBySet , memcachedTimeout, memcachedTimeoutBy , memcachedTimeoutHere, memcachedTimeoutByHere , memcachedLimited, memcachedLimitedKey, memcachedLimitedBy, memcachedLimitedKeyBy , memcachedLimitedHere, memcachedLimitedKeyHere, memcachedLimitedByHere, memcachedLimitedKeyByHere , memcachedLimitedTimeout, memcachedLimitedKeyTimeout, memcachedLimitedTimeoutBy, memcachedLimitedKeyTimeoutBy , memcachedLimitedTimeoutHere, memcachedLimitedKeyTimeoutHere, memcachedLimitedTimeoutByHere, memcachedLimitedKeyTimeoutByHere , memcacheAuth, memcacheAuthHere , memcacheAuth', memcacheAuthHere' , memcacheAuthMax, memcacheAuthHereMax , Expiry , MemcachedException(..), AsyncTimeoutException(..) ) where import Import.NoFoundation hiding (utc, exp) import Foundation.Type import qualified Database.Memcached.Binary.IO as Memcached import Data.Bits (Bits(zeroBits), toIntegralSized) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime, getPOSIXTime, POSIXTime) import qualified Data.Binary as Binary import qualified Data.Binary.Put as Binary import qualified Data.Binary.Get as Binary import Crypto.Hash.Algorithms (SHAKE256) import qualified Data.ByteArray as BA import qualified Data.ByteString.Base64 as Base64 import Language.Haskell.TH hiding (Type) import Data.Typeable (typeRep, typeRepFingerprint) import Type.Reflection (typeOf, TypeRep) import qualified Type.Reflection as Refl (typeRep) import Data.Type.Equality (TestEquality(..)) import qualified Data.HashMap.Strict as HashMap import qualified Control.Concurrent.TokenBucket as Concurrent (TokenBucket, newTokenBucket, tokenBucketTryAlloc) import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent.STM.Delay import qualified Crypto.Saltine.Class as Saltine import qualified Crypto.Saltine.Internal.SecretBox as Saltine import qualified Crypto.Saltine.Core.AEAD as AEAD import qualified Control.Monad.State.Class as State import qualified Data.ByteString.Lazy as Lazy (ByteString) import GHC.Fingerprint import Utils.Postgresql import UnliftIO.Concurrent (threadDelay) type Expiry = Either UTCTime DiffTime _MemcachedExpiry :: Prism' Expiry Memcached.Expiry _MemcachedExpiry = prism' fromExpiry toExpiry where toExpiry (Left utc) | posix > 2592000 = toIntegralSized posix | otherwise = Nothing where posix :: Integer posix = ceiling $ utcTimeToPOSIXSeconds utc toExpiry (Right dTime) | 0 < dTime , dTime <= 2592000 = Just $ ceiling dTime | otherwise = Nothing fromExpiry n | n <= 2592000 = Right $ fromIntegral n | otherwise = Left . posixSecondsToUTCTime $ fromIntegral n data MemcachedValue = MemcachedValue { mNonce :: AEAD.Nonce , mExpiry :: Maybe POSIXTime , mCiphertext :: ByteString } deriving (Generic) putExpiry :: Maybe POSIXTime -> Binary.Put putExpiry mExp = Binary.put $ fromMaybe 0 expEnc where expEnc :: Maybe Word64 expEnc = mExp <&> \exp -> let expEnc' :: Integer expEnc' = ceiling exp in if | 0 < expEnc', expEnc' < fromIntegral (maxBound :: Word64) -> fromIntegral expEnc' | otherwise -> error "Expiry cannot be represented in 64 unsigned bits" getExpiry :: Binary.Get (Maybe POSIXTime) getExpiry = Binary.label "expiry" $ do mExpiry' <- Binary.get :: Binary.Get Word64 return $ if | mExpiry' == 0 -> Nothing | otherwise -> Just $ fromIntegral mExpiry' putMemcachedValue :: MemcachedValue -> Binary.Put putMemcachedValue MemcachedValue{..} = do Binary.putByteString $ Saltine.encode mNonce putExpiry mExpiry Binary.putByteString mCiphertext getMemcachedValue, getMemcachedValueNoExpiry :: Binary.Get MemcachedValue getMemcachedValue = do Binary.lookAhead . Binary.label "length check" $ do void . Binary.getByteString $ Saltine.secretbox_noncebytes + 4 + Saltine.secretbox_macbytes mNonce <- Binary.label "nonce" $ Binary.getByteString Saltine.secretbox_noncebytes >>= hoistMaybe . Saltine.decode mExpiry <- getExpiry mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString return MemcachedValue{..} getMemcachedValueNoExpiry = do Binary.lookAhead . Binary.label "length check" $ do void . Binary.getByteString $ Saltine.secretbox_noncebytes + 4 + Saltine.secretbox_macbytes mNonce <- Binary.label "nonce" $ Binary.getByteString Saltine.secretbox_noncebytes >>= hoistMaybe . Saltine.decode let mExpiry = Nothing mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString return MemcachedValue{..} memcachedAvailable :: ( MonadHandler m, HandlerSite m ~ UniWorX ) => m Bool memcachedAvailable = getsYesod $ is _Just . appMemcached data MemcachedException = MemcachedException Memcached.MemcachedException | MemcachedInvalidExpiry Expiry deriving (Show) deriving anyclass (Exception) toMemcachedKey :: Typeable a => AEAD.Key -> Proxy a -> Lazy.ByteString -> ByteString toMemcachedKey (Saltine.encode -> kmacKey) p = BA.convert . kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey memcachedAAD :: ByteString -> Maybe POSIXTime -> ByteString memcachedAAD cKey mExpiry = toStrict . Binary.runPut $ do Binary.putByteString cKey putExpiry mExpiry memcachedByGet :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a, Binary a, NFData a , Binary k ) => k -> m (Maybe a) memcachedByGet (Binary.encode -> k) = runMaybeT $ arc <|> memcache where arc = do AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal res <- hoistMaybe . preview (_1 . _NFDynamic) <=< hoistMaybe <=< cachedARC' memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) $ \mPrev -> do prev@((_, prevExpiry), _) <- hoistMaybe mPrev $logDebugS "memcached" "Cache hit (local ARC)" lift . runMaybeT $ do -- To delete from ARC upon expiry for_ prevExpiry $ \expiry -> do now <- liftIO getPOSIXTime guard $ expiry > now return prev $logDebugS "memcached" "All valid (local ARC)" return res memcache = do AppMemcached{..} <- MaybeT $ getsYesod appMemcached localARC <- getsYesod appMemcachedLocal let cKey = toMemcachedKey memcachedKey (Proxy @a) k encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey memcachedConn $logDebugS "memcached" "Cache hit" let withExp doExp = do MemcachedValue{..} <- hoistMaybe . flip runGetMaybe encVal $ bool getMemcachedValueNoExpiry getMemcachedValue doExp $logDebugS "memcached" "Decode valid" for_ mExpiry $ \expiry -> do now <- liftIO getPOSIXTime guard $ expiry > now + clockLeniency $logDebugS "memcached" $ "Expiry valid: " <> tshow mExpiry let aad = memcachedAAD cKey mExpiry decrypted <- hoistMaybe $ AEAD.aeadOpen memcachedKey mNonce mCiphertext aad $logDebugS "memcached" $ "Decryption valid " <> bool "without" "with" doExp <> " expiration" let withCache = case localARC of Just AppMemcachedLocal{..} -> cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) Nothing -> fmap (view _1) . ($ Nothing) res <- hoistMaybe . preview (_1 . _NFDynamic) <=< withCache $ \case Nothing -> fmap ((, length decrypted) . (, mExpiry) . review (_NFDynamic @a)) . hoistMaybe $ runGetMaybe Binary.get decrypted Just p -> return p $logDebugS "memcached" "All valid" return res withExp True <|> withExp False where runGetMaybe p (fromStrict -> bs) = case Binary.runGetOrFail p bs of Right (bs', _, x) | null bs' -> Just x _other -> Nothing clockLeniency :: NominalDiffTime clockLeniency = 2 memcachedBySet :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Binary k ) => Maybe Expiry -> k -> a -> m () memcachedBySet mExp (Binary.encode -> k) v = do mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry let decrypted = toStrict $ Binary.encode v mExpiry <- for mExp $ \case Left uTime -> return $ utcTimeToPOSIXSeconds uTime Right diff -> liftIO $ (+ realToFrac diff) <$> getPOSIXTime mConn <- getsYesod appMemcached for_ mConn $ \AppMemcached{..} -> do mNonce <- liftIO AEAD.newNonce let cKey = toMemcachedKey memcachedKey (Proxy @a) k aad = memcachedAAD cKey mExpiry mCiphertext = AEAD.aead memcachedKey mNonce decrypted aad liftIO . handle (\(_ :: Memcached.MemcachedException) -> return ()) $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (Binary.runPut $ putMemcachedValue MemcachedValue{..}) memcachedConn $logDebugS "memcached" $ "Cache store: " <> tshow mExpiry mLocal <- getsYesod appMemcachedLocal for_ mLocal $ \AppMemcachedLocal{..} -> do void . cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) . const $ return ((_NFDynamic # v, mExpiry), length decrypted) $logDebugS "memcached" $ "Cache store: " <> tshow mExpiry <> " (local ARC)" -- DEBUG let inv = Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..} where mLocalInvalidateType = typeRepFingerprint . typeRep $ Proxy @a mLocalInvalidateKey = k $logDebugS "memcached" $ "To invalidate remotely: " <> tshow inv memcachedByInvalidate :: forall a k m p. ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a , Binary k ) => k -> p a -> m () memcachedByInvalidate (Binary.encode -> k) _ = arc >> memcache where memcache = maybeT_ $ do AppMemcached{..} <- MaybeT $ getsYesod appMemcached let cKey = toMemcachedKey memcachedKey (Proxy @a) k hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey memcachedConn $logDebugS "memcached" "Cache invalidation" arc = maybeT_ $ do AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal let arcKey = (typeRepFingerprint . typeRep $ Proxy @a, k) atomically $ modifyTVar' memcachedLocalInvalidationQueue (:> arcKey) void . cachedARC' memcachedLocalARC arcKey . const $ return Nothing $logDebugS "memcached" "Cache invalidation (local ARC)" data MemcachedLocalInvalidateMsg = MemcachedLocalInvalidateMsg { mLocalInvalidateType :: Fingerprint , mLocalInvalidateKey :: Lazy.ByteString } deriving (Eq, Ord, Show) instance Binary MemcachedLocalInvalidateMsg where get = Binary.label "MemcachedLocalInvalidateMsg" $ do mLocalInvalidateType <- Binary.label "mLocalInvalidateType" $ Fingerprint <$> Binary.getWord64le <*> Binary.getWord64le mLocalInvalidateKey <- Binary.label "mLocalInvalidateKey" Binary.getRemainingLazyByteString return MemcachedLocalInvalidateMsg{..} put MemcachedLocalInvalidateMsg{..} = do let Fingerprint w1 w2 = mLocalInvalidateType Binary.putWord64le w1 Binary.putWord64le w2 Binary.putLazyByteString mLocalInvalidateKey manageMemcachedLocalInvalidations :: ( MonadUnliftIO m , MonadLogger m ) => ARCHandle (Fingerprint, Lazy.ByteString) Int (NFDynamic, Maybe POSIXTime) -> TVar (Seq (Fingerprint, Lazy.ByteString)) -> PostgresqlChannelManager m () manageMemcachedLocalInvalidations localARC iQueue = PostgresqlChannelManager { pgcTerminate = forever $ threadDelay maxBound , pgcOnInput = Just $ \inpBS -> case Binary.runGetOrFail Binary.get . fromStrict <$> Base64.decode inpBS of Right (Right (bs', _, MemcachedLocalInvalidateMsg{..})) | null bs' -> void . cachedARC' localARC (mLocalInvalidateType, mLocalInvalidateKey) $ \mPrev -> do $logDebugS "memcached" $ "Remote invalidation in local ARC: " <> bool "miss" "hit" (is _Just mPrev) return Nothing _other -> $logErrorS "memcached" $ "Received unparseable remote invalidation: " <> tshow inpBS , pgcGenOutput = atomically $ do iQueue' <- readTVar iQueue i <- case iQueue' of i :< is' -> i <$ writeTVar iQueue is' _other -> mzero let (mLocalInvalidateType, mLocalInvalidateKey) = i return . Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..} } newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a } deriving newtype (Eq, Ord, Show, Binary) instance NFData a => NFData (MemcachedUnkeyed a) where rnf = rnf . unMemcachedUnkeyed memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a, Binary a, NFData a ) => m (Maybe a) memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet () memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a ) => Maybe Expiry -> a -> m () memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed memcachedInvalidate :: forall (a :: Type) m p. ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a ) => p a -> m () memcachedInvalidate _ = memcachedByInvalidate () $ Proxy @(MemcachedUnkeyed a) memcachedWith :: Monad m => (m (Maybe b), a -> m b) -> m a -> m b memcachedWith (doGet, doSet) act = do pRes <- doGet maybe id (const . return) pRes $ do res <- act doSet res memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a ) => Maybe Expiry -> m a -> m a memcached mExp = memcachedWith (memcachedGet, \x -> x <$ memcachedSet mExp x) memcachedBy :: forall a m k. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Binary k ) => Maybe Expiry -> k -> m a -> m a memcachedBy mExp k = memcachedWith (memcachedByGet k, \x -> x <$ memcachedBySet mExp k x) newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a } deriving newtype (Eq, Ord, Show, Binary) instance NFData a => NFData (MemcachedUnkeyedLoc a) where rnf MemcachedUnkeyedLoc{..} = rnf unMemcachedUnkeyedLoc memcachedHere :: Q Exp memcachedHere = do loc <- location [e| \mExp -> fmap unMemcachedUnkeyedLoc . memcachedBy mExp loc . fmap MemcachedUnkeyedLoc |] newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a } deriving newtype (Eq, Ord, Show, Binary) instance NFData a => NFData (MemcachedKeyedLoc a) where rnf MemcachedKeyedLoc{..} = rnf unMemcachedKeyedLoc withMemcachedKeyedLoc :: Functor f => (f (MemcachedKeyedLoc a) -> f (MemcachedKeyedLoc a)) -> (f a -> f a) withMemcachedKeyedLoc act = fmap unMemcachedKeyedLoc . act . fmap MemcachedKeyedLoc {-# INLINE withMemcachedKeyedLoc #-} withMemcachedKeyedLoc' :: (Functor f, Functor f') => (f (MemcachedKeyedLoc a) -> f (f' (MemcachedKeyedLoc a))) -> (f a -> f (f' a)) withMemcachedKeyedLoc' act = fmap (fmap unMemcachedKeyedLoc) . act . fmap MemcachedKeyedLoc {-# INLINE withMemcachedKeyedLoc' #-} memcachedByHere :: Q Exp memcachedByHere = do loc <- location [e| \mExp k -> withMemcachedKeyedLoc (memcachedBy mExp (loc, k)) |] data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a instance Hashable HashableDynamic where hashWithSalt s (HashableDynamic tRep v) = s `hashWithSalt` tRep `hashWithSalt` v instance Eq HashableDynamic where (HashableDynamic tRep v) == (HashableDynamic tRep' v') = case testEquality tRep tRep' of Just Refl -> v == v' Nothing -> False hashableDynamic :: forall a. ( Typeable a, Hashable a, Eq a ) => a -> HashableDynamic hashableDynamic v = HashableDynamic (typeOf v) v memcachedLimit :: TVar (HashMap HashableDynamic Concurrent.TokenBucket) memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty {-# NOINLINE memcachedLimit #-} memcachedLimitedWith :: ( MonadIO m , MonadLogger m , Typeable k', Hashable k', Eq k' ) => (m (Maybe a), a -> m ()) -> (m a -> MaybeT m a) -- ^ Wrap execution on cache miss -> k' -- ^ Key for limiting -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> m a -> m (Maybe a) memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate tokens act = runMaybeT $ do pRes <- lift doGet maybe id (const . return) pRes $ do mBucket <- fmap (HashMap.lookup lK) . liftIO $ readTVarIO memcachedLimit bucket <- case mBucket of Just bucket -> return bucket Nothing -> liftIO $ do bucket <- Concurrent.newTokenBucket atomically $ do hm <- readTVar memcachedLimit let hm' = HashMap.insertWith (const id) lK bucket hm writeTVar memcachedLimit $! hm' return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm' sufficientTokens <- liftIO $ Concurrent.tokenBucketTryAlloc bucket burst rate tokens $logDebugS "memcachedLimitedWith" $ "Sufficient tokens: " <> tshow sufficientTokens guard sufficientTokens liftAct $ do res <- act doSet res return res memcachedLimited :: forall a m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> m a -> m (Maybe a) memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift (Proxy @a) burst rate tokens memcachedLimitedKey :: forall a k' m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> m a -> m (Maybe a) memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift lK burst rate tokens memcachedLimitedBy :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Binary k ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> k -> m a -> m (Maybe a) memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift (Proxy @a) burst rate tokens memcachedLimitedKeyBy :: forall a k' k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' , Binary k ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> k -> m a -> m (Maybe a) memcachedLimitedKeyBy lK burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift lK burst rate tokens memcachedLimitedHere :: Q Exp memcachedLimitedHere = do loc <- location [e| \burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedBy burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedKeyHere :: Q Exp memcachedLimitedKeyHere = do loc <- location [e| \lK burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedByHere :: Q Exp memcachedLimitedByHere = do loc <- location [e| \burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedBy burst rate tokens mExp (loc, k)) |] memcachedLimitedKeyByHere :: Q Exp memcachedLimitedKeyByHere = do loc <- location [e| \lK burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k)) |] memcacheAuth :: forall m k a. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Binary k ) => k -> WriterT (Maybe (Min Expiry)) m a -> m a memcacheAuth k mx = cachedByBinary k $ do mayCache <- getsYesod $ view _appMemcacheAuth if | mayCache -> memcachedWith ( memcachedByGet k , \(x, mExp) -> x <$ case mExp of Nothing -> return () Just (Min exp) -> memcachedBySet (Just exp) k x ) $ runWriterT mx | otherwise -> evalWriterT mx memcacheAuth' :: forall a m k. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Binary k ) => Expiry -> k -> m a -> m a memcacheAuth' exp k = memcacheAuth k . (<* tell (Just $ Min exp)) . lift memcacheAuthMax :: forall m k a. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a, NFData a , Binary k ) => Expiry -> k -> WriterT (Maybe (Min Expiry)) m a -> m a memcacheAuthMax exp k = memcacheAuth k . (tell (Just $ Min exp) *>) memcacheAuthHere :: Q Exp memcacheAuthHere = do loc <- location [e| \k -> withMemcachedKeyedLoc (memcacheAuth (loc, k)) |] memcacheAuthHere' :: Q Exp memcacheAuthHere' = do loc <- location [e| \exp k -> withMemcachedKeyedLoc (memcacheAuth' exp (loc, k)) |] memcacheAuthHereMax :: Q Exp memcacheAuthHereMax = do loc <- location [e| \exp k -> withMemcachedKeyedLoc (memcacheAuthMax exp (loc, k)) |] data AsyncTimeoutException = AsyncTimeoutReturnTypeDoesNotMatchComputationKey deriving (Show) deriving anyclass (Exception) data DynamicAsync = forall a. DynamicAsync !(TypeRep a) !(Async a) instance Eq DynamicAsync where (DynamicAsync tRep v) == (DynamicAsync tRep' v') = case testEquality tRep tRep' of Just Refl -> v == v' Nothing -> False memcachedAsync :: TVar (HashMap HashableDynamic DynamicAsync) memcachedAsync = unsafePerformIO . newTVarIO $ HashMap.empty {-# NOINLINE memcachedAsync #-} liftAsyncTimeout :: forall k'' a m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadUnliftIO m , MonadThrow m , Typeable k'', Hashable k'', Eq k'' , Typeable a ) => DiffTime -> k'' -> m a -> MaybeT m a liftAsyncTimeout dt (hashableDynamic -> cK) act = ifNotM memcachedAvailable (lift act) $ do delay <- liftIO . newDelay . round $ toRational dt * 1e6 act' <- lift $ do existing <- traverse castDynamicAsync . HashMap.lookup cK <=< liftIO $ readTVarIO memcachedAsync case existing of Just act' -> return act' Nothing -> do startAct <- liftIO newEmptyTMVarIO act' <- async $ do $logDebugS "liftAsyncTimeout" "Waiting for confirmation..." atomically $ takeTMVar startAct $logDebugS "liftAsyncTimeout" "Confirmed." act act'' <- atomically $ do hm <- readTVar memcachedAsync let new = DynamicAsync (Refl.typeRep @a) act' go mOld = case mOld of Just old' -> do old <- castDynamicAsync old' resolved <- lift $ is _Just <$> pollSTM old if | resolved -> return $ Just new | otherwise -> do State.put old return $ Just old' Nothing -> return $ Just new (hm', act'') <- runStateT (HashMap.alterF go cK hm) act' writeTVar memcachedAsync $! hm' return act'' if | act' == act'' -> atomically $ putTMVar startAct () | otherwise -> cancel act' return act'' MaybeT . atomically $ (Nothing <$ waitDelay delay) <|> (Just <$> waitSTM act') where castDynamicAsync :: forall m'. MonadThrow m' => DynamicAsync -> m' (Async a) castDynamicAsync (DynamicAsync tRep v) | Just Refl <- testEquality tRep (Refl.typeRep @a) = return v | otherwise = throwM AsyncTimeoutReturnTypeDoesNotMatchComputationKey memcachedTimeoutWith :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadUnliftIO m , MonadThrow m , Typeable k'', Hashable k'', Eq k'' , Typeable a ) => (m (Maybe a), a -> m ()) -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedTimeoutWith (doGet, doSet) dt cK act = runMaybeT $ do pRes <- lift doGet maybe id (const . return) pRes $ liftAsyncTimeout dt cK $ do res <- act doSet res return res memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a, NFData a ) => Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp) memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a, NFData a , Binary k ) => Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) memcachedTimeoutBy mExp dt cK k = memcachedTimeoutWith (memcachedByGet k, memcachedBySet mExp k) dt cK memcachedTimeoutHere :: Q Exp memcachedTimeoutHere = do loc <- location [e| \mExp dt cK -> fmap unMemcachedUnkeyedLoc . memcachedTimeoutBy mExp dt cK loc . fmap MemcachedUnkeyedLoc |] memcachedTimeoutByHere :: Q Exp memcachedTimeoutByHere = do loc <- location [e| \mExp dt cK k -> fmap unMemcachedKeyedLoc . memcachedBy mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |] memcachedLimitedTimeout :: forall a k'' m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a, NFData a ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedLimitedTimeout burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens memcachedLimitedKeyTimeout :: forall a k' k'' m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedLimitedKeyTimeout lK burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) lK burst rate tokens memcachedLimitedTimeoutBy :: forall a k'' k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a, NFData a , Binary k ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) memcachedLimitedTimeoutBy burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' , Binary k ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) lK burst rate tokens memcachedLimitedTimeoutHere :: Q Exp memcachedLimitedTimeoutHere = do loc <- location [e| \burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedKeyTimeoutHere :: Q Exp memcachedLimitedKeyTimeoutHere = do loc <- location [e| \lK burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedTimeoutByHere :: Q Exp memcachedLimitedTimeoutByHere = do loc <- location [e| \burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |] memcachedLimitedKeyTimeoutByHere :: Q Exp memcachedLimitedKeyTimeoutByHere = do loc <- location [e| \lK burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]