804 lines
34 KiB
Haskell
804 lines
34 KiB
Haskell
{-# LANGUAGE DuplicateRecordFields #-}
|
|
|
|
module Handler.Utils.Memcached
|
|
( memcachedAvailable
|
|
, memcached, memcachedBy
|
|
, memcachedHere, memcachedByHere
|
|
, memcachedSet, memcachedGet
|
|
, memcachedInvalidate, memcachedByInvalidate
|
|
, manageMemcachedLocalInvalidations
|
|
, memcachedByGet, memcachedBySet
|
|
, memcachedTimeout, memcachedTimeoutBy
|
|
, memcachedTimeoutHere, memcachedTimeoutByHere
|
|
, memcachedLimited, memcachedLimitedKey, memcachedLimitedBy, memcachedLimitedKeyBy
|
|
, memcachedLimitedHere, memcachedLimitedKeyHere, memcachedLimitedByHere, memcachedLimitedKeyByHere
|
|
, memcachedLimitedTimeout, memcachedLimitedKeyTimeout, memcachedLimitedTimeoutBy, memcachedLimitedKeyTimeoutBy
|
|
, memcachedLimitedTimeoutHere, memcachedLimitedKeyTimeoutHere, memcachedLimitedTimeoutByHere, memcachedLimitedKeyTimeoutByHere
|
|
, memcacheAuth, memcacheAuthHere
|
|
, memcacheAuth', memcacheAuthHere'
|
|
, memcacheAuthMax, memcacheAuthHereMax
|
|
, Expiry
|
|
, MemcachedException(..), AsyncTimeoutException(..)
|
|
) where
|
|
|
|
import Import.NoFoundation hiding (utc, exp)
|
|
import Foundation.Type
|
|
|
|
|
|
import qualified Database.Memcached.Binary.IO as Memcached
|
|
import Data.Bits (Bits(zeroBits), toIntegralSized)
|
|
|
|
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime, getPOSIXTime, POSIXTime)
|
|
|
|
import qualified Data.Binary as Binary
|
|
import qualified Data.Binary.Put as Binary
|
|
import qualified Data.Binary.Get as Binary
|
|
|
|
import Crypto.Hash.Algorithms (SHAKE256)
|
|
|
|
import qualified Data.ByteArray as BA
|
|
|
|
import qualified Data.ByteString.Base64 as Base64
|
|
|
|
import Language.Haskell.TH hiding (Type)
|
|
|
|
import Data.Typeable (typeRep, typeRepFingerprint)
|
|
import Type.Reflection (typeOf, TypeRep)
|
|
import qualified Type.Reflection as Refl (typeRep)
|
|
import Data.Type.Equality (TestEquality(..))
|
|
|
|
import qualified Data.HashMap.Strict as HashMap
|
|
|
|
import qualified Control.Concurrent.TokenBucket as Concurrent (TokenBucket, newTokenBucket, tokenBucketTryAlloc)
|
|
|
|
import System.IO.Unsafe (unsafePerformIO)
|
|
|
|
import Control.Concurrent.STM.Delay
|
|
|
|
import qualified Crypto.Saltine.Class as Saltine
|
|
import qualified Crypto.Saltine.Internal.SecretBox as Saltine
|
|
import qualified Crypto.Saltine.Core.AEAD as AEAD
|
|
|
|
import qualified Control.Monad.State.Class as State
|
|
|
|
import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
|
|
|
import GHC.Fingerprint
|
|
|
|
import Utils.Postgresql
|
|
|
|
import UnliftIO.Concurrent (threadDelay)
|
|
|
|
|
|
type Expiry = Either UTCTime DiffTime
|
|
|
|
_MemcachedExpiry :: Prism' Expiry Memcached.Expiry
|
|
_MemcachedExpiry = prism' fromExpiry toExpiry
|
|
where toExpiry (Left utc)
|
|
| posix > 2592000 = toIntegralSized posix
|
|
| otherwise = Nothing
|
|
where posix :: Integer
|
|
posix = ceiling $ utcTimeToPOSIXSeconds utc
|
|
toExpiry (Right dTime)
|
|
| 0 < dTime
|
|
, dTime <= 2592000
|
|
= Just $ ceiling dTime
|
|
| otherwise
|
|
= Nothing
|
|
|
|
fromExpiry n
|
|
| n <= 2592000
|
|
= Right $ fromIntegral n
|
|
| otherwise
|
|
= Left . posixSecondsToUTCTime $ fromIntegral n
|
|
|
|
data MemcachedValue = MemcachedValue
|
|
{ mNonce :: AEAD.Nonce
|
|
, mExpiry :: Maybe POSIXTime
|
|
, mCiphertext :: ByteString
|
|
} deriving (Generic, Typeable)
|
|
|
|
putExpiry :: Maybe POSIXTime -> Binary.Put
|
|
putExpiry mExp = Binary.put $ fromMaybe 0 expEnc
|
|
where
|
|
expEnc :: Maybe Word64
|
|
expEnc = mExp <&> \exp ->
|
|
let expEnc' :: Integer
|
|
expEnc' = ceiling exp
|
|
in if | 0 < expEnc', expEnc' < fromIntegral (maxBound :: Word64)
|
|
-> fromIntegral expEnc'
|
|
| otherwise
|
|
-> error "Expiry cannot be represented in 64 unsigned bits"
|
|
|
|
getExpiry :: Binary.Get (Maybe POSIXTime)
|
|
getExpiry = Binary.label "expiry" $ do
|
|
mExpiry' <- Binary.get :: Binary.Get Word64
|
|
return $ if
|
|
| mExpiry' == 0 -> Nothing
|
|
| otherwise -> Just $ fromIntegral mExpiry'
|
|
|
|
putMemcachedValue :: MemcachedValue -> Binary.Put
|
|
putMemcachedValue MemcachedValue{..} = do
|
|
Binary.putByteString $ Saltine.encode mNonce
|
|
putExpiry mExpiry
|
|
Binary.putByteString mCiphertext
|
|
|
|
getMemcachedValue, getMemcachedValueNoExpiry :: Binary.Get MemcachedValue
|
|
getMemcachedValue = do
|
|
Binary.lookAhead . Binary.label "length check" $ do
|
|
void . Binary.getByteString $ Saltine.secretbox_noncebytes + 4 + Saltine.secretbox_macbytes
|
|
mNonce <- Binary.label "nonce" $ Binary.getByteString Saltine.secretbox_noncebytes >>= hoistMaybe . Saltine.decode
|
|
mExpiry <- getExpiry
|
|
mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString
|
|
return MemcachedValue{..}
|
|
getMemcachedValueNoExpiry = do
|
|
Binary.lookAhead . Binary.label "length check" $ do
|
|
void . Binary.getByteString $ Saltine.secretbox_noncebytes + 4 + Saltine.secretbox_macbytes
|
|
mNonce <- Binary.label "nonce" $ Binary.getByteString Saltine.secretbox_noncebytes >>= hoistMaybe . Saltine.decode
|
|
let mExpiry = Nothing
|
|
mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString
|
|
return MemcachedValue{..}
|
|
|
|
memcachedAvailable :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
|
)
|
|
=> m Bool
|
|
memcachedAvailable = getsYesod $ is _Just . appMemcached
|
|
|
|
|
|
data MemcachedException = MemcachedException Memcached.MemcachedException
|
|
| MemcachedInvalidExpiry Expiry
|
|
deriving (Show, Typeable)
|
|
deriving anyclass (Exception)
|
|
|
|
|
|
toMemcachedKey :: Typeable a
|
|
=> AEAD.Key -> Proxy a -> Lazy.ByteString -> ByteString
|
|
toMemcachedKey (Saltine.encode -> kmacKey) p = BA.convert . kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey
|
|
|
|
memcachedAAD :: ByteString -> Maybe POSIXTime -> ByteString
|
|
memcachedAAD cKey mExpiry = toStrict . Binary.runPut $ do
|
|
Binary.putByteString cKey
|
|
putExpiry mExpiry
|
|
|
|
memcachedByGet :: forall a k m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> k -> m (Maybe a)
|
|
memcachedByGet (Binary.encode -> k) = runMaybeT $ arc <|> memcache
|
|
where
|
|
arc = do
|
|
AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal
|
|
res <- hoistMaybe . preview (_1 . _NFDynamic) <=< hoistMaybe <=< cachedARC' memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) $ \mPrev -> do
|
|
prev@((_, prevExpiry), _) <- hoistMaybe mPrev
|
|
$logDebugS "memcached" "Cache hit (local ARC)"
|
|
lift . runMaybeT $ do -- To delete from ARC upon expiry
|
|
for_ prevExpiry $ \expiry -> do
|
|
now <- liftIO getPOSIXTime
|
|
guard $ expiry > now
|
|
return prev
|
|
$logDebugS "memcached" "All valid (local ARC)"
|
|
return res
|
|
memcache = do
|
|
AppMemcached{..} <- MaybeT $ getsYesod appMemcached
|
|
localARC <- getsYesod appMemcachedLocal
|
|
let cKey = toMemcachedKey memcachedKey (Proxy @a) k
|
|
|
|
encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey memcachedConn
|
|
|
|
$logDebugS "memcached" "Cache hit"
|
|
|
|
let withExp doExp = do
|
|
MemcachedValue{..} <- hoistMaybe . flip runGetMaybe encVal $ bool getMemcachedValueNoExpiry getMemcachedValue doExp
|
|
$logDebugS "memcached" "Decode valid"
|
|
for_ mExpiry $ \expiry -> do
|
|
now <- liftIO getPOSIXTime
|
|
guard $ expiry > now + clockLeniency
|
|
$logDebugS "memcached" $ "Expiry valid: " <> tshow mExpiry
|
|
let aad = memcachedAAD cKey mExpiry
|
|
decrypted <- hoistMaybe $ AEAD.aeadOpen memcachedKey mNonce mCiphertext aad
|
|
|
|
$logDebugS "memcached" $ "Decryption valid " <> bool "without" "with" doExp <> " expiration"
|
|
|
|
let withCache = case localARC of
|
|
Just AppMemcachedLocal{..} -> cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k)
|
|
Nothing -> fmap (view _1) . ($ Nothing)
|
|
res <- hoistMaybe . preview (_1 . _NFDynamic) <=< withCache $ \case
|
|
Nothing -> fmap ((, length decrypted) . (, mExpiry) . review (_NFDynamic @a)) . hoistMaybe $ runGetMaybe Binary.get decrypted
|
|
Just p -> return p
|
|
|
|
$logDebugS "memcached" "All valid"
|
|
|
|
return res
|
|
|
|
withExp True <|> withExp False
|
|
where
|
|
runGetMaybe p (fromStrict -> bs) = case Binary.runGetOrFail p bs of
|
|
Right (bs', _, x) | null bs' -> Just x
|
|
_other -> Nothing
|
|
clockLeniency :: NominalDiffTime
|
|
clockLeniency = 2
|
|
|
|
memcachedBySet :: forall a k m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> Maybe Expiry -> k -> a -> m ()
|
|
memcachedBySet mExp (Binary.encode -> k) v = do
|
|
mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry
|
|
|
|
let decrypted = toStrict $ Binary.encode v
|
|
mExpiry <- for mExp $ \case
|
|
Left uTime -> return $ utcTimeToPOSIXSeconds uTime
|
|
Right diff -> liftIO $ (+ realToFrac diff) <$> getPOSIXTime
|
|
|
|
mConn <- getsYesod appMemcached
|
|
for_ mConn $ \AppMemcached{..} -> do
|
|
mNonce <- liftIO AEAD.newNonce
|
|
let cKey = toMemcachedKey memcachedKey (Proxy @a) k
|
|
aad = memcachedAAD cKey mExpiry
|
|
mCiphertext = AEAD.aead memcachedKey mNonce decrypted aad
|
|
liftIO . handle (\(_ :: Memcached.MemcachedException) -> return ()) $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (Binary.runPut $ putMemcachedValue MemcachedValue{..}) memcachedConn
|
|
$logDebugS "memcached" $ "Cache store: " <> tshow mExpiry
|
|
|
|
mLocal <- getsYesod appMemcachedLocal
|
|
for_ mLocal $ \AppMemcachedLocal{..} -> do
|
|
void . cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) . const $ return ((_NFDynamic # v, mExpiry), length decrypted)
|
|
$logDebugS "memcached" $ "Cache store: " <> tshow mExpiry <> " (local ARC)"
|
|
-- DEBUG
|
|
let inv = Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..}
|
|
where mLocalInvalidateType = typeRepFingerprint . typeRep $ Proxy @a
|
|
mLocalInvalidateKey = k
|
|
$logDebugS "memcached" $ "To invalidate remotely: " <> tshow inv
|
|
|
|
memcachedByInvalidate :: forall a k m p.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, Typeable a
|
|
, Binary k
|
|
)
|
|
=> k -> p a -> m ()
|
|
memcachedByInvalidate (Binary.encode -> k) _ = arc >> memcache
|
|
where
|
|
memcache = maybeT_ $ do
|
|
AppMemcached{..} <- MaybeT $ getsYesod appMemcached
|
|
let cKey = toMemcachedKey memcachedKey (Proxy @a) k
|
|
hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey memcachedConn
|
|
$logDebugS "memcached" "Cache invalidation"
|
|
arc = maybeT_ $ do
|
|
AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal
|
|
let arcKey = (typeRepFingerprint . typeRep $ Proxy @a, k)
|
|
atomically $ modifyTVar' memcachedLocalInvalidationQueue (:> arcKey)
|
|
void . cachedARC' memcachedLocalARC arcKey . const $ return Nothing
|
|
$logDebugS "memcached" "Cache invalidation (local ARC)"
|
|
|
|
data MemcachedLocalInvalidateMsg = MemcachedLocalInvalidateMsg
|
|
{ mLocalInvalidateType :: Fingerprint
|
|
, mLocalInvalidateKey :: Lazy.ByteString
|
|
} deriving (Eq, Ord, Show, Typeable)
|
|
|
|
instance Binary MemcachedLocalInvalidateMsg where
|
|
get = Binary.label "MemcachedLocalInvalidateMsg" $ do
|
|
mLocalInvalidateType <- Binary.label "mLocalInvalidateType" $ Fingerprint <$> Binary.getWord64le <*> Binary.getWord64le
|
|
mLocalInvalidateKey <- Binary.label "mLocalInvalidateKey" Binary.getRemainingLazyByteString
|
|
return MemcachedLocalInvalidateMsg{..}
|
|
put MemcachedLocalInvalidateMsg{..} = do
|
|
let Fingerprint w1 w2 = mLocalInvalidateType
|
|
Binary.putWord64le w1
|
|
Binary.putWord64le w2
|
|
Binary.putLazyByteString mLocalInvalidateKey
|
|
|
|
manageMemcachedLocalInvalidations :: ( MonadUnliftIO m
|
|
, MonadLogger m
|
|
)
|
|
=> ARCHandle (Fingerprint, Lazy.ByteString) Int (NFDynamic, Maybe POSIXTime)
|
|
-> TVar (Seq (Fingerprint, Lazy.ByteString))
|
|
-> PostgresqlChannelManager m ()
|
|
manageMemcachedLocalInvalidations localARC iQueue = PostgresqlChannelManager
|
|
{ pgcTerminate = forever $ threadDelay maxBound
|
|
, pgcOnInput = Just $ \inpBS -> case Binary.runGetOrFail Binary.get . fromStrict <$> Base64.decode inpBS of
|
|
Right (Right (bs', _, MemcachedLocalInvalidateMsg{..})) | null bs' ->
|
|
void . cachedARC' localARC (mLocalInvalidateType, mLocalInvalidateKey) $ \mPrev -> do
|
|
$logDebugS "memcached" $ "Remote invalidation in local ARC: " <> bool "miss" "hit" (is _Just mPrev)
|
|
return Nothing
|
|
_other -> $logErrorS "memcached" $ "Received unparseable remote invalidation: " <> tshow inpBS
|
|
, pgcGenOutput = atomically $ do
|
|
iQueue' <- readTVar iQueue
|
|
i <- case iQueue' of
|
|
i :< is' -> i <$ writeTVar iQueue is'
|
|
_other -> mzero
|
|
let (mLocalInvalidateType, mLocalInvalidateKey) = i
|
|
return . Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..}
|
|
}
|
|
|
|
|
|
newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a }
|
|
deriving (Typeable)
|
|
deriving newtype (Eq, Ord, Show, Binary)
|
|
instance NFData a => NFData (MemcachedUnkeyed a) where
|
|
rnf = rnf . unMemcachedUnkeyed
|
|
|
|
memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, Typeable a, Binary a, NFData a
|
|
)
|
|
=> m (Maybe a)
|
|
memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet ()
|
|
|
|
memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
)
|
|
=> Maybe Expiry -> a -> m ()
|
|
memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed
|
|
|
|
memcachedInvalidate :: forall (a :: Type) m p.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, Typeable a
|
|
)
|
|
=> p a -> m ()
|
|
memcachedInvalidate _ = memcachedByInvalidate () $ Proxy @(MemcachedUnkeyed a)
|
|
|
|
|
|
memcachedWith :: Monad m
|
|
=> (m (Maybe b), a -> m b) -> m a -> m b
|
|
memcachedWith (doGet, doSet) act = do
|
|
pRes <- doGet
|
|
maybe id (const . return) pRes $ do
|
|
res <- act
|
|
doSet res
|
|
|
|
memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
)
|
|
=> Maybe Expiry -> m a -> m a
|
|
memcached mExp = memcachedWith (memcachedGet, \x -> x <$ memcachedSet mExp x)
|
|
|
|
memcachedBy :: forall a m k.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> Maybe Expiry -> k -> m a -> m a
|
|
memcachedBy mExp k = memcachedWith (memcachedByGet k, \x -> x <$ memcachedBySet mExp k x)
|
|
|
|
|
|
newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a }
|
|
deriving (Typeable)
|
|
deriving newtype (Eq, Ord, Show, Binary)
|
|
instance NFData a => NFData (MemcachedUnkeyedLoc a) where
|
|
rnf MemcachedUnkeyedLoc{..} = rnf unMemcachedUnkeyedLoc
|
|
|
|
memcachedHere :: Q Exp
|
|
memcachedHere = do
|
|
loc <- location
|
|
[e| \mExp -> fmap unMemcachedUnkeyedLoc . memcachedBy mExp loc . fmap MemcachedUnkeyedLoc |]
|
|
|
|
newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a }
|
|
deriving (Typeable)
|
|
deriving newtype (Eq, Ord, Show, Binary)
|
|
instance NFData a => NFData (MemcachedKeyedLoc a) where
|
|
rnf MemcachedKeyedLoc{..} = rnf unMemcachedKeyedLoc
|
|
|
|
withMemcachedKeyedLoc :: Functor f => (f (MemcachedKeyedLoc a) -> f (MemcachedKeyedLoc a)) -> (f a -> f a)
|
|
withMemcachedKeyedLoc act = fmap unMemcachedKeyedLoc . act . fmap MemcachedKeyedLoc
|
|
{-# INLINE withMemcachedKeyedLoc #-}
|
|
|
|
withMemcachedKeyedLoc' :: (Functor f, Functor f') => (f (MemcachedKeyedLoc a) -> f (f' (MemcachedKeyedLoc a))) -> (f a -> f (f' a))
|
|
withMemcachedKeyedLoc' act = fmap (fmap unMemcachedKeyedLoc) . act . fmap MemcachedKeyedLoc
|
|
{-# INLINE withMemcachedKeyedLoc' #-}
|
|
|
|
memcachedByHere :: Q Exp
|
|
memcachedByHere = do
|
|
loc <- location
|
|
[e| \mExp k -> withMemcachedKeyedLoc (memcachedBy mExp (loc, k)) |]
|
|
|
|
|
|
data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a
|
|
|
|
instance Hashable HashableDynamic where
|
|
hashWithSalt s (HashableDynamic tRep v) = s `hashWithSalt` tRep `hashWithSalt` v
|
|
instance Eq HashableDynamic where
|
|
(HashableDynamic tRep v) == (HashableDynamic tRep' v') = case testEquality tRep tRep' of
|
|
Just Refl -> v == v'
|
|
Nothing -> False
|
|
|
|
hashableDynamic :: forall a.
|
|
( Typeable a, Hashable a, Eq a )
|
|
=> a -> HashableDynamic
|
|
hashableDynamic v = HashableDynamic (typeOf v) v
|
|
|
|
memcachedLimit :: TVar (HashMap HashableDynamic Concurrent.TokenBucket)
|
|
memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty
|
|
{-# NOINLINE memcachedLimit #-}
|
|
|
|
memcachedLimitedWith :: ( MonadIO m
|
|
, MonadLogger m
|
|
, Typeable k', Hashable k', Eq k'
|
|
)
|
|
=> (m (Maybe a), a -> m ())
|
|
-> (m a -> MaybeT m a) -- ^ Wrap execution on cache miss
|
|
-> k' -- ^ Key for limiting
|
|
-> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate tokens act = runMaybeT $ do
|
|
pRes <- lift doGet
|
|
maybe id (const . return) pRes $ do
|
|
mBucket <- fmap (HashMap.lookup lK) . liftIO $ readTVarIO memcachedLimit
|
|
bucket <- case mBucket of
|
|
Just bucket -> return bucket
|
|
Nothing -> liftIO $ do
|
|
bucket <- Concurrent.newTokenBucket
|
|
atomically $ do
|
|
hm <- readTVar memcachedLimit
|
|
let hm' = HashMap.insertWith (const id) lK bucket hm
|
|
writeTVar memcachedLimit $! hm'
|
|
return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm'
|
|
sufficientTokens <- liftIO $ Concurrent.tokenBucketTryAlloc bucket burst rate tokens
|
|
$logDebugS "memcachedLimitedWith" $ "Sufficient tokens: " <> tshow sufficientTokens
|
|
guard sufficientTokens
|
|
|
|
liftAct $ do
|
|
res <- act
|
|
doSet res
|
|
return res
|
|
|
|
memcachedLimited :: forall a m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
)
|
|
=> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift (Proxy @a) burst rate tokens
|
|
|
|
memcachedLimitedKey :: forall a k' m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Typeable k', Hashable k', Eq k'
|
|
)
|
|
=> k'
|
|
-> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift lK burst rate tokens
|
|
|
|
memcachedLimitedBy :: forall a k m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> k
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift (Proxy @a) burst rate tokens
|
|
|
|
memcachedLimitedKeyBy :: forall a k' k m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Typeable k', Hashable k', Eq k'
|
|
, Binary k
|
|
)
|
|
=> k'
|
|
-> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> k
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedKeyBy lK burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift lK burst rate tokens
|
|
|
|
memcachedLimitedHere :: Q Exp
|
|
memcachedLimitedHere = do
|
|
loc <- location
|
|
[e| \burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedBy burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |]
|
|
|
|
memcachedLimitedKeyHere :: Q Exp
|
|
memcachedLimitedKeyHere = do
|
|
loc <- location
|
|
[e| \lK burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |]
|
|
|
|
memcachedLimitedByHere :: Q Exp
|
|
memcachedLimitedByHere = do
|
|
loc <- location
|
|
[e| \burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedBy burst rate tokens mExp (loc, k)) |]
|
|
|
|
memcachedLimitedKeyByHere :: Q Exp
|
|
memcachedLimitedKeyByHere = do
|
|
loc <- location
|
|
[e| \lK burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k)) |]
|
|
|
|
|
|
memcacheAuth :: forall m k a.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> k
|
|
-> WriterT (Maybe (Min Expiry)) m a
|
|
-> m a
|
|
memcacheAuth k mx = cachedByBinary k $ do
|
|
mayCache <- getsYesod $ view _appMemcacheAuth
|
|
if | mayCache
|
|
-> memcachedWith
|
|
( memcachedByGet k
|
|
, \(x, mExp) -> x <$ case mExp of
|
|
Nothing -> return ()
|
|
Just (Min exp) -> memcachedBySet (Just exp) k x
|
|
) $ runWriterT mx
|
|
| otherwise
|
|
-> evalWriterT mx
|
|
|
|
memcacheAuth' :: forall a m k.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> Expiry
|
|
-> k
|
|
-> m a
|
|
-> m a
|
|
memcacheAuth' exp k = memcacheAuth k . (<* tell (Just $ Min exp)) . lift
|
|
|
|
memcacheAuthMax :: forall m k a.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> Expiry
|
|
-> k
|
|
-> WriterT (Maybe (Min Expiry)) m a
|
|
-> m a
|
|
memcacheAuthMax exp k = memcacheAuth k . (tell (Just $ Min exp) *>)
|
|
|
|
memcacheAuthHere :: Q Exp
|
|
memcacheAuthHere = do
|
|
loc <- location
|
|
[e| \k -> withMemcachedKeyedLoc (memcacheAuth (loc, k)) |]
|
|
|
|
memcacheAuthHere' :: Q Exp
|
|
memcacheAuthHere' = do
|
|
loc <- location
|
|
[e| \exp k -> withMemcachedKeyedLoc (memcacheAuth' exp (loc, k)) |]
|
|
|
|
memcacheAuthHereMax :: Q Exp
|
|
memcacheAuthHereMax = do
|
|
loc <- location
|
|
[e| \exp k -> withMemcachedKeyedLoc (memcacheAuthMax exp (loc, k)) |]
|
|
|
|
|
|
|
|
data AsyncTimeoutException = AsyncTimeoutReturnTypeDoesNotMatchComputationKey
|
|
deriving (Show, Typeable)
|
|
deriving anyclass (Exception)
|
|
|
|
data DynamicAsync = forall a. DynamicAsync !(TypeRep a) !(Async a)
|
|
|
|
instance Eq DynamicAsync where
|
|
(DynamicAsync tRep v) == (DynamicAsync tRep' v') = case testEquality tRep tRep' of
|
|
Just Refl -> v == v'
|
|
Nothing -> False
|
|
|
|
memcachedAsync :: TVar (HashMap HashableDynamic DynamicAsync)
|
|
memcachedAsync = unsafePerformIO . newTVarIO $ HashMap.empty
|
|
{-# NOINLINE memcachedAsync #-}
|
|
|
|
liftAsyncTimeout :: forall k'' a m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadUnliftIO m
|
|
, MonadThrow m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a
|
|
)
|
|
=> DiffTime
|
|
-> k''
|
|
-> m a -> MaybeT m a
|
|
liftAsyncTimeout dt (hashableDynamic -> cK) act = ifNotM memcachedAvailable (lift act) $ do
|
|
delay <- liftIO . newDelay . round $ toRational dt * 1e6
|
|
|
|
act' <- lift $ do
|
|
existing <- traverse castDynamicAsync . HashMap.lookup cK <=< liftIO $ readTVarIO memcachedAsync
|
|
case existing of
|
|
Just act' -> return act'
|
|
Nothing -> do
|
|
startAct <- liftIO newEmptyTMVarIO
|
|
act' <- async $ do
|
|
$logDebugS "liftAsyncTimeout" "Waiting for confirmation..."
|
|
atomically $ takeTMVar startAct
|
|
$logDebugS "liftAsyncTimeout" "Confirmed."
|
|
act
|
|
act'' <- atomically $ do
|
|
hm <- readTVar memcachedAsync
|
|
let new = DynamicAsync (Refl.typeRep @a) act'
|
|
go mOld = case mOld of
|
|
Just old' -> do
|
|
old <- castDynamicAsync old'
|
|
resolved <- lift $ is _Just <$> pollSTM old
|
|
|
|
if | resolved -> return $ Just new
|
|
| otherwise -> do
|
|
State.put old
|
|
return $ Just old'
|
|
Nothing -> return $ Just new
|
|
|
|
(hm', act'') <- runStateT (HashMap.alterF go cK hm) act'
|
|
writeTVar memcachedAsync $! hm'
|
|
return act''
|
|
if | act' == act'' -> atomically $ putTMVar startAct ()
|
|
| otherwise -> cancel act'
|
|
return act''
|
|
|
|
MaybeT . atomically $ (Nothing <$ waitDelay delay) <|> (Just <$> waitSTM act')
|
|
where
|
|
castDynamicAsync :: forall m'. MonadThrow m' => DynamicAsync -> m' (Async a)
|
|
castDynamicAsync (DynamicAsync tRep v)
|
|
| Just Refl <- testEquality tRep (Refl.typeRep @a)
|
|
= return v
|
|
| otherwise
|
|
= throwM AsyncTimeoutReturnTypeDoesNotMatchComputationKey
|
|
|
|
memcachedTimeoutWith :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadUnliftIO m
|
|
, MonadThrow m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a
|
|
)
|
|
=> (m (Maybe a), a -> m ()) -> DiffTime -> k'' -> m a -> m (Maybe a)
|
|
memcachedTimeoutWith (doGet, doSet) dt cK act = runMaybeT $ do
|
|
pRes <- lift doGet
|
|
maybe id (const . return) pRes $
|
|
liftAsyncTimeout dt cK $ do
|
|
res <- act
|
|
doSet res
|
|
return res
|
|
|
|
memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, MonadUnliftIO m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a, Binary a, NFData a
|
|
)
|
|
=> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a)
|
|
memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp)
|
|
|
|
memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, MonadUnliftIO m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a)
|
|
memcachedTimeoutBy mExp dt cK k = memcachedTimeoutWith (memcachedByGet k, memcachedBySet mExp k) dt cK
|
|
|
|
memcachedTimeoutHere :: Q Exp
|
|
memcachedTimeoutHere = do
|
|
loc <- location
|
|
[e| \mExp dt cK -> fmap unMemcachedUnkeyedLoc . memcachedTimeoutBy mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
|
|
|
|
memcachedTimeoutByHere :: Q Exp
|
|
memcachedTimeoutByHere = do
|
|
loc <- location
|
|
[e| \mExp dt cK k -> fmap unMemcachedKeyedLoc . memcachedBy mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
|
|
|
|
memcachedLimitedTimeout :: forall a k'' m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, MonadUnliftIO m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a, Binary a, NFData a
|
|
)
|
|
=> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> DiffTime
|
|
-> k''
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedTimeout burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens
|
|
|
|
memcachedLimitedKeyTimeout :: forall a k' k'' m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, MonadUnliftIO m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a, Binary a, NFData a
|
|
, Typeable k', Hashable k', Eq k'
|
|
)
|
|
=> k'
|
|
-> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> DiffTime
|
|
-> k''
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedKeyTimeout lK burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) lK burst rate tokens
|
|
|
|
memcachedLimitedTimeoutBy :: forall a k'' k m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, MonadUnliftIO m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a, Binary a, NFData a
|
|
, Binary k
|
|
)
|
|
=> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> DiffTime
|
|
-> k''
|
|
-> k
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedTimeoutBy burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens
|
|
|
|
memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m.
|
|
( MonadHandler m, HandlerSite m ~ UniWorX
|
|
, MonadThrow m
|
|
, MonadUnliftIO m
|
|
, Typeable k'', Hashable k'', Eq k''
|
|
, Typeable a, Binary a, NFData a
|
|
, Typeable k', Hashable k', Eq k'
|
|
, Binary k
|
|
)
|
|
=> k'
|
|
-> Word64 -- ^ burst-size (tokens)
|
|
-> Word64 -- ^ avg. inverse rate (usec/token)
|
|
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
|
-> Maybe Expiry
|
|
-> DiffTime
|
|
-> k''
|
|
-> k
|
|
-> m a
|
|
-> m (Maybe a)
|
|
memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) lK burst rate tokens
|
|
|
|
memcachedLimitedTimeoutHere :: Q Exp
|
|
memcachedLimitedTimeoutHere = do
|
|
loc <- location
|
|
[e| \burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
|
|
|
|
memcachedLimitedKeyTimeoutHere :: Q Exp
|
|
memcachedLimitedKeyTimeoutHere = do
|
|
loc <- location
|
|
[e| \lK burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
|
|
|
|
memcachedLimitedTimeoutByHere :: Q Exp
|
|
memcachedLimitedTimeoutByHere = do
|
|
loc <- location
|
|
[e| \burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
|
|
|
|
memcachedLimitedKeyTimeoutByHere :: Q Exp
|
|
memcachedLimitedKeyTimeoutByHere = do
|
|
loc <- location
|
|
[e| \lK burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
|
|
|