fradrive/src/Handler/Utils/Memcached.hs
Gregor Kleen c68a01d7ae refactor: split foundation & llvm
BREAKING CHANGE: split foundation
2020-08-14 17:02:14 +02:00

566 lines
23 KiB
Haskell

module Handler.Utils.Memcached
( memcachedAvailable
, memcached, memcachedBy
, memcachedHere, memcachedByHere
, memcachedSet, memcachedGet
, memcachedInvalidate, memcachedByInvalidate
, memcachedByGet, memcachedBySet
, memcachedTimeout, memcachedTimeoutBy
, memcachedTimeoutHere, memcachedTimeoutByHere
, memcachedLimited, memcachedLimitedKey, memcachedLimitedBy, memcachedLimitedKeyBy
, memcachedLimitedHere, memcachedLimitedKeyHere, memcachedLimitedByHere, memcachedLimitedKeyByHere
, memcachedLimitedTimeout, memcachedLimitedKeyTimeout, memcachedLimitedTimeoutBy, memcachedLimitedKeyTimeoutBy
, memcachedLimitedTimeoutHere, memcachedLimitedKeyTimeoutHere, memcachedLimitedTimeoutByHere, memcachedLimitedKeyTimeoutByHere
, Expiry
, MemcachedException(..), AsyncTimeoutException(..)
) where
import Import.NoFoundation hiding (utc, exp)
import Foundation.Type
import qualified Database.Memcached.Binary.IO as Memcached
import Data.Bits (Bits(zeroBits), toIntegralSized)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime)
import qualified Data.Binary as Binary
import Crypto.Hash.Algorithms (SHAKE256)
import qualified Data.ByteArray as BA
import Language.Haskell.TH hiding (Type)
import Data.Typeable (typeRep)
import Type.Reflection (typeOf, TypeRep)
import qualified Type.Reflection as Refl (typeRep)
import Data.Type.Equality (TestEquality(..))
import qualified Data.HashMap.Strict as HashMap
import qualified Control.Concurrent.TokenBucket as Concurrent (TokenBucket, newTokenBucket, tokenBucketTryAlloc)
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent.STM.Delay
import qualified Crypto.Saltine.Class as Saltine
import qualified Crypto.Saltine.Internal.ByteSizes as Saltine
import qualified Crypto.Saltine.Core.AEAD as AEAD
import qualified Control.Monad.State.Class as State
type Expiry = Either UTCTime DiffTime
_MemcachedExpiry :: Prism' Expiry Memcached.Expiry
_MemcachedExpiry = prism' fromExpiry toExpiry
where toExpiry (Left utc)
| posix > 2592000 = toIntegralSized posix
| otherwise = Nothing
where posix :: Integer
posix = ceiling $ utcTimeToPOSIXSeconds utc
toExpiry (Right dTime)
| 0 < dTime
, dTime <= 2592000
= Just $ ceiling dTime
| otherwise
= Nothing
fromExpiry n
| n <= 2592000
= Right $ fromIntegral n
| otherwise
= Left . posixSecondsToUTCTime $ fromIntegral n
memcachedAvailable :: ( MonadHandler m, HandlerSite m ~ UniWorX
)
=> m Bool
memcachedAvailable = getsYesod $ is _Just . appMemcached
data MemcachedException = MemcachedException Memcached.MemcachedException
| MemcachedInvalidExpiry Expiry
deriving (Show, Typeable)
deriving anyclass (Exception)
memcachedKey :: ( Typeable a
, Binary k
)
=> AEAD.Key -> Proxy a -> k -> ByteString
memcachedKey (Saltine.encode -> kmacKey) p k = Binary.encode k
& kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey
& BA.convert
memcachedByGet :: forall a k m.
( MonadHandler m, HandlerSite m ~ UniWorX
, Typeable a, Binary a
, Binary k
)
=> k -> m (Maybe a)
memcachedByGet k = runMaybeT $ do
(aeadKey, conn) <- MaybeT $ getsYesod appMemcached
let cKey = memcachedKey aeadKey (Proxy @a) k
encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey conn
$logDebugS "memcached" "Cache hit"
guard $ length encVal >= Saltine.secretBoxNonce + Saltine.secretBoxMac
let (nonceBS, encrypted) = splitAt Saltine.secretBoxNonce encVal
nonce <- hoistMaybe $ Saltine.decode nonceBS
decrypted <- hoistMaybe $ AEAD.aeadOpen aeadKey nonce encrypted cKey
$logDebugS "memcached" "Decryption valid"
case Binary.decodeOrFail $ fromStrict decrypted of
Right (unconsumed, _, v)
| null unconsumed -> do
$logDebugS "memcached" "Deserialization valid"
return v
_other -> mzero
memcachedBySet :: forall a k m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
, Binary k
)
=> Maybe Expiry -> k -> a -> m ()
memcachedBySet mExp k v = do
mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry
mConn <- getsYesod appMemcached
for_ mConn $ \(aeadKey, conn) -> do
nonce <- liftIO AEAD.newNonce
let cKey = memcachedKey aeadKey (Proxy @a) k
encVal = Saltine.encode nonce <> AEAD.aead aeadKey nonce (toStrict $ Binary.encode v) cKey
liftIO $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (fromStrict encVal) conn
$logDebugS "memcached" "Cache store"
memcachedByInvalidate :: forall a k m p.
( MonadHandler m, HandlerSite m ~ UniWorX
, Typeable a
, Binary k
)
=> k -> p a -> m ()
memcachedByInvalidate k _ = maybeT_ $ do
(aeadKey, conn) <- MaybeT $ getsYesod appMemcached
let cKey = memcachedKey aeadKey (Proxy @a) k
hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey conn
newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a }
deriving (Typeable)
deriving newtype (Eq, Ord, Show, Binary)
memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX
, Typeable a, Binary a
)
=> m (Maybe a)
memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet ()
memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
)
=> Maybe Expiry -> a -> m ()
memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed
memcachedInvalidate :: forall (a :: Type) m p.
( MonadHandler m, HandlerSite m ~ UniWorX
, Typeable a
)
=> p a -> m ()
memcachedInvalidate _ = memcachedByInvalidate () $ Proxy @(MemcachedUnkeyed a)
memcachedWith :: Monad m
=> (m (Maybe a), a -> m ()) -> m a -> m a
memcachedWith (doGet, doSet) act = do
pRes <- doGet
maybe id (const . return) pRes $ do
res <- act
doSet res
return res
memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
)
=> Maybe Expiry -> m a -> m a
memcached mExp = memcachedWith (memcachedGet, memcachedSet mExp)
memcachedBy :: ( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
, Binary k
)
=> Maybe Expiry -> k -> m a -> m a
memcachedBy mExp k = memcachedWith (memcachedByGet k, memcachedBySet mExp k)
newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a }
deriving (Typeable)
deriving newtype (Eq, Ord, Show, Binary)
memcachedHere :: Q Exp
memcachedHere = do
loc <- location
[e| \mExp -> fmap unMemcachedUnkeyedLoc . memcachedBy mExp loc . fmap MemcachedUnkeyedLoc |]
newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a }
deriving (Typeable)
deriving newtype (Eq, Ord, Show, Binary)
memcachedByHere :: Q Exp
memcachedByHere = do
loc <- location
[e| \mExp k -> fmap unMemcachedKeyedLoc . memcachedBy mExp (loc, k) . fmap MemcachedKeyedLoc |]
data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a
instance Hashable HashableDynamic where
hashWithSalt s (HashableDynamic tRep v) = s `hashWithSalt` tRep `hashWithSalt` v
instance Eq HashableDynamic where
(HashableDynamic tRep v) == (HashableDynamic tRep' v') = case testEquality tRep tRep' of
Just Refl -> v == v'
Nothing -> False
hashableDynamic :: forall a.
( Typeable a, Hashable a, Eq a )
=> a -> HashableDynamic
hashableDynamic v = HashableDynamic (typeOf v) v
memcachedLimit :: TVar (HashMap HashableDynamic Concurrent.TokenBucket)
memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty
{-# NOINLINE memcachedLimit #-}
memcachedLimitedWith :: ( MonadIO m
, MonadLogger m
, Typeable k', Hashable k', Eq k'
)
=> (m (Maybe a), a -> m ())
-> (m a -> MaybeT m a) -- ^ Wrap execution on cache miss
-> k' -- ^ Key for limiting
-> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> m a
-> m (Maybe a)
memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate tokens act = runMaybeT $ do
pRes <- lift doGet
maybe id (const . return) pRes $ do
mBucket <- fmap (HashMap.lookup lK) . liftIO $ readTVarIO memcachedLimit
bucket <- case mBucket of
Just bucket -> return bucket
Nothing -> liftIO $ do
bucket <- Concurrent.newTokenBucket
atomically $ do
hm <- readTVar memcachedLimit
let hm' = HashMap.insertWith (flip const) lK bucket hm
writeTVar memcachedLimit $! hm'
return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm'
sufficientTokens <- liftIO $ Concurrent.tokenBucketTryAlloc bucket burst rate tokens
$logDebugS "memcachedLimitedWith" $ "Sufficient tokens: " <> tshow sufficientTokens
guard sufficientTokens
liftAct $ do
res <- act
doSet res
return res
memcachedLimited :: forall a m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
)
=> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> m a
-> m (Maybe a)
memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift (Proxy @a) burst rate tokens
memcachedLimitedKey :: forall a k' m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
, Typeable k', Hashable k', Eq k'
)
=> k'
-> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> m a
-> m (Maybe a)
memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift lK burst rate tokens
memcachedLimitedBy :: forall a k m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
, Binary k
)
=> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> k
-> m a
-> m (Maybe a)
memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift (Proxy @a) burst rate tokens
memcachedLimitedKeyBy :: forall a k' k m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, Typeable a, Binary a
, Typeable k', Hashable k', Eq k'
, Binary k
)
=> k'
-> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> k
-> m a
-> m (Maybe a)
memcachedLimitedKeyBy lK burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift lK burst rate tokens
memcachedLimitedHere :: Q Exp
memcachedLimitedHere = do
loc <- location
[e| \burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedBy burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |]
memcachedLimitedKeyHere :: Q Exp
memcachedLimitedKeyHere = do
loc <- location
[e| \lK burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |]
memcachedLimitedByHere :: Q Exp
memcachedLimitedByHere = do
loc <- location
[e| \burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedBy burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |]
memcachedLimitedKeyByHere :: Q Exp
memcachedLimitedKeyByHere = do
loc <- location
[e| \lK burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |]
data AsyncTimeoutException = AsyncTimeoutReturnTypeDoesNotMatchComputationKey
deriving (Show, Typeable)
deriving anyclass (Exception)
data DynamicAsync = forall a. DynamicAsync !(TypeRep a) !(Async a)
instance Eq DynamicAsync where
(DynamicAsync tRep v) == (DynamicAsync tRep' v') = case testEquality tRep tRep' of
Just Refl -> v == v'
Nothing -> False
memcachedAsync :: TVar (HashMap HashableDynamic DynamicAsync)
memcachedAsync = unsafePerformIO . newTVarIO $ HashMap.empty
{-# NOINLINE memcachedAsync #-}
liftAsyncTimeout :: forall k'' a m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadUnliftIO m
, MonadThrow m
, Typeable k'', Hashable k'', Eq k''
, Typeable a
)
=> DiffTime
-> k''
-> m a -> MaybeT m a
liftAsyncTimeout dt (hashableDynamic -> cK) act = ifNotM memcachedAvailable (lift act) $ do
delay <- liftIO . newDelay . round $ toRational dt * 1e6
act' <- lift $ do
existing <- traverse castDynamicAsync . HashMap.lookup cK <=< liftIO $ readTVarIO memcachedAsync
case existing of
Just act' -> return act'
Nothing -> do
startAct <- liftIO newEmptyTMVarIO
act' <- async $ do
$logDebugS "liftAsyncTimeout" "Waiting for confirmation..."
atomically $ takeTMVar startAct
$logDebugS "liftAsyncTimeout" "Confirmed."
act
act'' <- atomically $ do
hm <- readTVar memcachedAsync
let new = DynamicAsync (Refl.typeRep @a) act'
go mOld = case mOld of
Just old' -> do
old <- castDynamicAsync old'
resolved <- lift $ is _Just <$> pollSTM old
if | resolved -> return $ Just new
| otherwise -> do
State.put old
return $ Just old'
Nothing -> return $ Just new
(hm', act'') <- runStateT (HashMap.alterF go cK hm) act'
writeTVar memcachedAsync $! hm'
return act''
if | act' == act'' -> atomically $ putTMVar startAct ()
| otherwise -> cancel act'
return act''
MaybeT . atomically $ (Nothing <$ waitDelay delay) <|> (Just <$> waitSTM act')
where
castDynamicAsync :: forall m'. MonadThrow m' => DynamicAsync -> m' (Async a)
castDynamicAsync (DynamicAsync tRep v)
| Just Refl <- testEquality tRep (Refl.typeRep @a)
= return v
| otherwise
= throwM AsyncTimeoutReturnTypeDoesNotMatchComputationKey
memcachedTimeoutWith :: ( MonadHandler m, HandlerSite m ~ UniWorX
, MonadUnliftIO m
, MonadThrow m
, Typeable k'', Hashable k'', Eq k''
, Typeable a
)
=> (m (Maybe a), a -> m ()) -> DiffTime -> k'' -> m a -> m (Maybe a)
memcachedTimeoutWith (doGet, doSet) dt cK act = runMaybeT $ do
pRes <- lift doGet
maybe id (const . return) pRes $
liftAsyncTimeout dt cK $ do
res <- act
doSet res
return res
memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, MonadUnliftIO m
, Typeable k'', Hashable k'', Eq k''
, Typeable a, Binary a
)
=> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a)
memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp)
memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, MonadUnliftIO m
, Typeable k'', Hashable k'', Eq k''
, Typeable a, Binary a
, Binary k
)
=> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a)
memcachedTimeoutBy mExp dt cK k = memcachedTimeoutWith (memcachedByGet k, memcachedBySet mExp k) dt cK
memcachedTimeoutHere :: Q Exp
memcachedTimeoutHere = do
loc <- location
[e| \mExp dt cK -> fmap unMemcachedUnkeyedLoc . memcachedTimeoutBy mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
memcachedTimeoutByHere :: Q Exp
memcachedTimeoutByHere = do
loc <- location
[e| \mExp dt cK k -> fmap unMemcachedKeyedLoc . memcachedBy mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
memcachedLimitedTimeout :: forall a k'' m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, MonadUnliftIO m
, Typeable k'', Hashable k'', Eq k''
, Typeable a, Binary a
)
=> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> DiffTime
-> k''
-> m a
-> m (Maybe a)
memcachedLimitedTimeout burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens
memcachedLimitedKeyTimeout :: forall a k' k'' m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, MonadUnliftIO m
, Typeable k'', Hashable k'', Eq k''
, Typeable a, Binary a
, Typeable k', Hashable k', Eq k'
)
=> k'
-> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> DiffTime
-> k''
-> m a
-> m (Maybe a)
memcachedLimitedKeyTimeout lK burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) lK burst rate tokens
memcachedLimitedTimeoutBy :: forall a k'' k m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, MonadUnliftIO m
, Typeable k'', Hashable k'', Eq k''
, Typeable a, Binary a
, Binary k
)
=> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> DiffTime
-> k''
-> k
-> m a
-> m (Maybe a)
memcachedLimitedTimeoutBy burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens
memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m.
( MonadHandler m, HandlerSite m ~ UniWorX
, MonadThrow m
, MonadUnliftIO m
, Typeable k'', Hashable k'', Eq k''
, Typeable a, Binary a
, Typeable k', Hashable k', Eq k'
, Binary k
)
=> k'
-> Word64 -- ^ burst-size (tokens)
-> Word64 -- ^ avg. inverse rate (usec/token)
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
-> Maybe Expiry
-> DiffTime
-> k''
-> k
-> m a
-> m (Maybe a)
memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) lK burst rate tokens
memcachedLimitedTimeoutHere :: Q Exp
memcachedLimitedTimeoutHere = do
loc <- location
[e| \burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
memcachedLimitedKeyTimeoutHere :: Q Exp
memcachedLimitedKeyTimeoutHere = do
loc <- location
[e| \lK burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
memcachedLimitedTimeoutByHere :: Q Exp
memcachedLimitedTimeoutByHere = do
loc <- location
[e| \burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
memcachedLimitedKeyTimeoutByHere :: Q Exp
memcachedLimitedKeyTimeoutByHere = do
loc <- location
[e| \lK burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]