module Handler.Utils.Memcached ( memcachedAvailable , memcached, memcachedBy , memcachedHere, memcachedByHere , memcachedSet, memcachedGet , memcachedInvalidate, memcachedByInvalidate , memcachedByGet, memcachedBySet , memcachedTimeout, memcachedTimeoutBy , memcachedTimeoutHere, memcachedTimeoutByHere , memcachedLimited, memcachedLimitedKey, memcachedLimitedBy, memcachedLimitedKeyBy , memcachedLimitedHere, memcachedLimitedKeyHere, memcachedLimitedByHere, memcachedLimitedKeyByHere , memcachedLimitedTimeout, memcachedLimitedKeyTimeout, memcachedLimitedTimeoutBy, memcachedLimitedKeyTimeoutBy , memcachedLimitedTimeoutHere, memcachedLimitedKeyTimeoutHere, memcachedLimitedTimeoutByHere, memcachedLimitedKeyTimeoutByHere , Expiry , MemcachedException(..), AsyncTimeoutException(..) ) where import Import.NoFoundation hiding (utc, exp) import Foundation.Type import qualified Database.Memcached.Binary.IO as Memcached import Data.Bits (Bits(zeroBits), toIntegralSized) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime) import qualified Data.Binary as Binary import Crypto.Hash.Algorithms (SHAKE256) import qualified Data.ByteArray as BA import Language.Haskell.TH hiding (Type) import Data.Typeable (typeRep) import Type.Reflection (typeOf, TypeRep) import qualified Type.Reflection as Refl (typeRep) import Data.Type.Equality (TestEquality(..)) import qualified Data.HashMap.Strict as HashMap import qualified Control.Concurrent.TokenBucket as Concurrent (TokenBucket, newTokenBucket, tokenBucketTryAlloc) import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent.STM.Delay import qualified Crypto.Saltine.Class as Saltine import qualified Crypto.Saltine.Internal.ByteSizes as Saltine import qualified Crypto.Saltine.Core.AEAD as AEAD import qualified Control.Monad.State.Class as State type Expiry = Either UTCTime DiffTime _MemcachedExpiry :: Prism' Expiry Memcached.Expiry _MemcachedExpiry = prism' fromExpiry toExpiry where toExpiry (Left utc) | posix > 2592000 = toIntegralSized posix | otherwise = Nothing where posix :: Integer posix = ceiling $ utcTimeToPOSIXSeconds utc toExpiry (Right dTime) | 0 < dTime , dTime <= 2592000 = Just $ ceiling dTime | otherwise = Nothing fromExpiry n | n <= 2592000 = Right $ fromIntegral n | otherwise = Left . posixSecondsToUTCTime $ fromIntegral n memcachedAvailable :: ( MonadHandler m, HandlerSite m ~ UniWorX ) => m Bool memcachedAvailable = getsYesod $ is _Just . appMemcached data MemcachedException = MemcachedException Memcached.MemcachedException | MemcachedInvalidExpiry Expiry deriving (Show, Typeable) deriving anyclass (Exception) memcachedKey :: ( Typeable a , Binary k ) => AEAD.Key -> Proxy a -> k -> ByteString memcachedKey (Saltine.encode -> kmacKey) p k = Binary.encode k & kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey & BA.convert memcachedByGet :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a, Binary a , Binary k ) => k -> m (Maybe a) memcachedByGet k = runMaybeT $ do (aeadKey, conn) <- MaybeT $ getsYesod appMemcached let cKey = memcachedKey aeadKey (Proxy @a) k encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey conn $logDebugS "memcached" "Cache hit" guard $ length encVal >= Saltine.secretBoxNonce + Saltine.secretBoxMac let (nonceBS, encrypted) = splitAt Saltine.secretBoxNonce encVal nonce <- hoistMaybe $ Saltine.decode nonceBS decrypted <- hoistMaybe $ AEAD.aeadOpen aeadKey nonce encrypted cKey $logDebugS "memcached" "Decryption valid" case Binary.decodeOrFail $ fromStrict decrypted of Right (unconsumed, _, v) | null unconsumed -> do $logDebugS "memcached" "Deserialization valid" return v _other -> mzero memcachedBySet :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a , Binary k ) => Maybe Expiry -> k -> a -> m () memcachedBySet mExp k v = do mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry mConn <- getsYesod appMemcached for_ mConn $ \(aeadKey, conn) -> do nonce <- liftIO AEAD.newNonce let cKey = memcachedKey aeadKey (Proxy @a) k encVal = Saltine.encode nonce <> AEAD.aead aeadKey nonce (toStrict $ Binary.encode v) cKey liftIO $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (fromStrict encVal) conn $logDebugS "memcached" "Cache store" memcachedByInvalidate :: forall a k m p. ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a , Binary k ) => k -> p a -> m () memcachedByInvalidate k _ = maybeT_ $ do (aeadKey, conn) <- MaybeT $ getsYesod appMemcached let cKey = memcachedKey aeadKey (Proxy @a) k hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey conn newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a } deriving (Typeable) deriving newtype (Eq, Ord, Show, Binary) memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a, Binary a ) => m (Maybe a) memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet () memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a ) => Maybe Expiry -> a -> m () memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed memcachedInvalidate :: forall (a :: Type) m p. ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a ) => p a -> m () memcachedInvalidate _ = memcachedByInvalidate () $ Proxy @(MemcachedUnkeyed a) memcachedWith :: Monad m => (m (Maybe a), a -> m ()) -> m a -> m a memcachedWith (doGet, doSet) act = do pRes <- doGet maybe id (const . return) pRes $ do res <- act doSet res return res memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a ) => Maybe Expiry -> m a -> m a memcached mExp = memcachedWith (memcachedGet, memcachedSet mExp) memcachedBy :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a , Binary k ) => Maybe Expiry -> k -> m a -> m a memcachedBy mExp k = memcachedWith (memcachedByGet k, memcachedBySet mExp k) newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a } deriving (Typeable) deriving newtype (Eq, Ord, Show, Binary) memcachedHere :: Q Exp memcachedHere = do loc <- location [e| \mExp -> fmap unMemcachedUnkeyedLoc . memcachedBy mExp loc . fmap MemcachedUnkeyedLoc |] newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a } deriving (Typeable) deriving newtype (Eq, Ord, Show, Binary) memcachedByHere :: Q Exp memcachedByHere = do loc <- location [e| \mExp k -> fmap unMemcachedKeyedLoc . memcachedBy mExp (loc, k) . fmap MemcachedKeyedLoc |] data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a instance Hashable HashableDynamic where hashWithSalt s (HashableDynamic tRep v) = s `hashWithSalt` tRep `hashWithSalt` v instance Eq HashableDynamic where (HashableDynamic tRep v) == (HashableDynamic tRep' v') = case testEquality tRep tRep' of Just Refl -> v == v' Nothing -> False hashableDynamic :: forall a. ( Typeable a, Hashable a, Eq a ) => a -> HashableDynamic hashableDynamic v = HashableDynamic (typeOf v) v memcachedLimit :: TVar (HashMap HashableDynamic Concurrent.TokenBucket) memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty {-# NOINLINE memcachedLimit #-} memcachedLimitedWith :: ( MonadIO m , MonadLogger m , Typeable k', Hashable k', Eq k' ) => (m (Maybe a), a -> m ()) -> (m a -> MaybeT m a) -- ^ Wrap execution on cache miss -> k' -- ^ Key for limiting -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> m a -> m (Maybe a) memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate tokens act = runMaybeT $ do pRes <- lift doGet maybe id (const . return) pRes $ do mBucket <- fmap (HashMap.lookup lK) . liftIO $ readTVarIO memcachedLimit bucket <- case mBucket of Just bucket -> return bucket Nothing -> liftIO $ do bucket <- Concurrent.newTokenBucket atomically $ do hm <- readTVar memcachedLimit let hm' = HashMap.insertWith (flip const) lK bucket hm writeTVar memcachedLimit $! hm' return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm' sufficientTokens <- liftIO $ Concurrent.tokenBucketTryAlloc bucket burst rate tokens $logDebugS "memcachedLimitedWith" $ "Sufficient tokens: " <> tshow sufficientTokens guard sufficientTokens liftAct $ do res <- act doSet res return res memcachedLimited :: forall a m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> m a -> m (Maybe a) memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift (Proxy @a) burst rate tokens memcachedLimitedKey :: forall a k' m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a , Typeable k', Hashable k', Eq k' ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> m a -> m (Maybe a) memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift lK burst rate tokens memcachedLimitedBy :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a , Binary k ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> k -> m a -> m (Maybe a) memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift (Proxy @a) burst rate tokens memcachedLimitedKeyBy :: forall a k' k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , Typeable a, Binary a , Typeable k', Hashable k', Eq k' , Binary k ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> k -> m a -> m (Maybe a) memcachedLimitedKeyBy lK burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift lK burst rate tokens memcachedLimitedHere :: Q Exp memcachedLimitedHere = do loc <- location [e| \burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedBy burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedKeyHere :: Q Exp memcachedLimitedKeyHere = do loc <- location [e| \lK burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedByHere :: Q Exp memcachedLimitedByHere = do loc <- location [e| \burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedBy burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |] memcachedLimitedKeyByHere :: Q Exp memcachedLimitedKeyByHere = do loc <- location [e| \lK burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |] data AsyncTimeoutException = AsyncTimeoutReturnTypeDoesNotMatchComputationKey deriving (Show, Typeable) deriving anyclass (Exception) data DynamicAsync = forall a. DynamicAsync !(TypeRep a) !(Async a) instance Eq DynamicAsync where (DynamicAsync tRep v) == (DynamicAsync tRep' v') = case testEquality tRep tRep' of Just Refl -> v == v' Nothing -> False memcachedAsync :: TVar (HashMap HashableDynamic DynamicAsync) memcachedAsync = unsafePerformIO . newTVarIO $ HashMap.empty {-# NOINLINE memcachedAsync #-} liftAsyncTimeout :: forall k'' a m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadUnliftIO m , MonadThrow m , Typeable k'', Hashable k'', Eq k'' , Typeable a ) => DiffTime -> k'' -> m a -> MaybeT m a liftAsyncTimeout dt (hashableDynamic -> cK) act = ifNotM memcachedAvailable (lift act) $ do delay <- liftIO . newDelay . round $ toRational dt * 1e6 act' <- lift $ do existing <- traverse castDynamicAsync . HashMap.lookup cK <=< liftIO $ readTVarIO memcachedAsync case existing of Just act' -> return act' Nothing -> do startAct <- liftIO newEmptyTMVarIO act' <- async $ do $logDebugS "liftAsyncTimeout" "Waiting for confirmation..." atomically $ takeTMVar startAct $logDebugS "liftAsyncTimeout" "Confirmed." act act'' <- atomically $ do hm <- readTVar memcachedAsync let new = DynamicAsync (Refl.typeRep @a) act' go mOld = case mOld of Just old' -> do old <- castDynamicAsync old' resolved <- lift $ is _Just <$> pollSTM old if | resolved -> return $ Just new | otherwise -> do State.put old return $ Just old' Nothing -> return $ Just new (hm', act'') <- runStateT (HashMap.alterF go cK hm) act' writeTVar memcachedAsync $! hm' return act'' if | act' == act'' -> atomically $ putTMVar startAct () | otherwise -> cancel act' return act'' MaybeT . atomically $ (Nothing <$ waitDelay delay) <|> (Just <$> waitSTM act') where castDynamicAsync :: forall m'. MonadThrow m' => DynamicAsync -> m' (Async a) castDynamicAsync (DynamicAsync tRep v) | Just Refl <- testEquality tRep (Refl.typeRep @a) = return v | otherwise = throwM AsyncTimeoutReturnTypeDoesNotMatchComputationKey memcachedTimeoutWith :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadUnliftIO m , MonadThrow m , Typeable k'', Hashable k'', Eq k'' , Typeable a ) => (m (Maybe a), a -> m ()) -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedTimeoutWith (doGet, doSet) dt cK act = runMaybeT $ do pRes <- lift doGet maybe id (const . return) pRes $ liftAsyncTimeout dt cK $ do res <- act doSet res return res memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a ) => Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp) memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a , Binary k ) => Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) memcachedTimeoutBy mExp dt cK k = memcachedTimeoutWith (memcachedByGet k, memcachedBySet mExp k) dt cK memcachedTimeoutHere :: Q Exp memcachedTimeoutHere = do loc <- location [e| \mExp dt cK -> fmap unMemcachedUnkeyedLoc . memcachedTimeoutBy mExp dt cK loc . fmap MemcachedUnkeyedLoc |] memcachedTimeoutByHere :: Q Exp memcachedTimeoutByHere = do loc <- location [e| \mExp dt cK k -> fmap unMemcachedKeyedLoc . memcachedBy mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |] memcachedLimitedTimeout :: forall a k'' m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedLimitedTimeout burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens memcachedLimitedKeyTimeout :: forall a k' k'' m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a , Typeable k', Hashable k', Eq k' ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedLimitedKeyTimeout lK burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) lK burst rate tokens memcachedLimitedTimeoutBy :: forall a k'' k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a , Binary k ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) memcachedLimitedTimeoutBy burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' , Typeable a, Binary a , Typeable k', Hashable k', Eq k' , Binary k ) => k' -> Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) -> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform -> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) lK burst rate tokens memcachedLimitedTimeoutHere :: Q Exp memcachedLimitedTimeoutHere = do loc <- location [e| \burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedKeyTimeoutHere :: Q Exp memcachedLimitedKeyTimeoutHere = do loc <- location [e| \lK burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |] memcachedLimitedTimeoutByHere :: Q Exp memcachedLimitedTimeoutByHere = do loc <- location [e| \burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |] memcachedLimitedKeyTimeoutByHere :: Q Exp memcachedLimitedKeyTimeoutByHere = do loc <- location [e| \lK burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]