feat(memcached): introduce general purpose memcached
This commit is contained in:
parent
5b3df769dc
commit
e8c2dc5aaa
@ -123,7 +123,7 @@ widget-memcached:
|
||||
host: "_env:WIDGET_MEMCACHED_HOST:"
|
||||
port: "_env:WIDGET_MEMCACHED_PORT:11211"
|
||||
auth: []
|
||||
limit: "_env:WIDGET_MEMCACHED_LIMIT:10"
|
||||
limit: "_env:WIDGET_MEMCACHED_LIMIT:1024"
|
||||
timeout: "_env:WIDGET_MEMCACHED_TIMEOUT:20"
|
||||
base-url: "_env:WIDGET_MEMCACHED_ROOT:"
|
||||
expiration: "_env:WIDGET_MEMCACHED_EXPIRATION:3600"
|
||||
@ -132,10 +132,18 @@ session-memcached:
|
||||
host: "_env:SESSION_MEMCACHED_HOST:"
|
||||
port: "_env:SESSION_MEMCACHED_PORT:11211"
|
||||
auth: []
|
||||
limit: "_env:SESSION_MEMCACHED_LIMIT:10"
|
||||
limit: "_env:SESSION_MEMCACHED_LIMIT:1024"
|
||||
timeout: "_env:SESSION_MEMCACHED_TIMEOUT:20"
|
||||
expiration: "_env:SESSION_MEMCACHED_EXPIRATION:28807"
|
||||
|
||||
memcached:
|
||||
host: "_env:MEMCACHED_HOST:"
|
||||
port: "_env:MEMCACHED_PORT:11211"
|
||||
auth: []
|
||||
limit: "_env:MEMCACHED_LIMIT:1024"
|
||||
timeout: "_env:MEMCACHED_TIMEOUT:20"
|
||||
expiration: "_env:MEMCACHED_EXPIRATION:300"
|
||||
|
||||
server-sessions:
|
||||
idle-timeout: 28807
|
||||
absolute-timeout: 604801
|
||||
|
||||
@ -143,6 +143,8 @@ dependencies:
|
||||
- rfc5051
|
||||
- unidecode
|
||||
- pandoc
|
||||
- token-bucket
|
||||
- async
|
||||
|
||||
other-extensions:
|
||||
- GeneralizedNewtypeDeriving
|
||||
|
||||
@ -175,7 +175,7 @@ makeFoundation appSettings'@AppSettings{..} = do
|
||||
-- logging function. To get out of this loop, we initially create a
|
||||
-- temporary foundation without a real connection pool, get a log function
|
||||
-- from there, and then create the real foundation.
|
||||
let mkFoundation appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID = UniWorX {..}
|
||||
let mkFoundation appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached = UniWorX {..}
|
||||
-- The UniWorX {..} syntax is an example of record wild cards. For more
|
||||
-- information, see:
|
||||
-- https://ocharles.org.uk/blog/posts/2014-12-04-record-wildcards.html
|
||||
@ -189,6 +189,7 @@ makeFoundation appSettings'@AppSettings{..} = do
|
||||
(error "widgetMemcached forced in tempFoundation")
|
||||
(error "JSONWebKeySet forced in tempFoundation")
|
||||
(error "ClusterID forced in tempFoundation")
|
||||
(error "memcached forced in tempFoundation")
|
||||
|
||||
runAppLoggingT tempFoundation $ do
|
||||
$logInfoS "InstanceID" $ UUID.toText appInstanceID
|
||||
@ -226,9 +227,15 @@ makeFoundation appSettings'@AppSettings{..} = do
|
||||
appJSONWebKeySet <- clusterSetting (Proxy :: Proxy 'ClusterJSONWebKeySet) `runSqlPool` sqlPool
|
||||
appClusterID <- clusterSetting (Proxy :: Proxy 'ClusterId) `runSqlPool` sqlPool
|
||||
|
||||
appMemcached <- for appMemcachedConf $ \memcachedConf -> do
|
||||
$logDebugS "setup" "Memcached"
|
||||
memcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterMemcachedKey) `runSqlPool` sqlPool
|
||||
memcached <- createMemcached memcachedConf
|
||||
return (memcachedKey, memcached)
|
||||
|
||||
appSessionStore <- mkSessionStore appSettings' sqlPool `runSqlPool` sqlPool
|
||||
|
||||
let foundation = mkFoundation sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID
|
||||
let foundation = mkFoundation sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached
|
||||
|
||||
-- Return the foundation
|
||||
$logDebugS "setup" "Done"
|
||||
@ -490,10 +497,10 @@ appMain = runResourceT $ do
|
||||
case watchdogMicroSec of
|
||||
Just wInterval
|
||||
| maybe True (== myProcessID) watchdogProcess
|
||||
-> let notifyWatchdog :: forall a. IO a
|
||||
notifyWatchdog = runAppLoggingT foundation $ go Nothing
|
||||
-> let notifyWatchdog :: forall a m'. ( MonadLogger m', MonadIO m') => m' a
|
||||
notifyWatchdog = go Nothing
|
||||
where
|
||||
go :: Maybe (Set (UTCTime, HealthReport)) -> LoggingT IO a
|
||||
go :: Maybe (Set (UTCTime, HealthReport)) -> m' a
|
||||
go pResults = do
|
||||
let delay = floor $ wInterval % 2
|
||||
d <- liftIO $ newDelay delay
|
||||
|
||||
@ -14,6 +14,7 @@ import Jobs.Types
|
||||
import Yesod.Core.Types (Logger)
|
||||
|
||||
import qualified Crypto.Saltine.Core.SecretBox as SecretBox
|
||||
import qualified Crypto.Saltine.Core.AEAD as AEAD
|
||||
import qualified Jose.Jwk as Jose
|
||||
|
||||
import qualified Database.Memcached.Binary.IO as Memcached
|
||||
@ -49,6 +50,7 @@ data UniWorX = UniWorX
|
||||
, appSecretBoxKey :: SecretBox.Key
|
||||
, appJSONWebKeySet :: Jose.JwkSet
|
||||
, appHealthReport :: TVar (Set (UTCTime, HealthReport))
|
||||
, appMemcached :: Maybe (AEAD.Key, Memcached.Connection)
|
||||
}
|
||||
|
||||
makeLenses_ ''UniWorX
|
||||
|
||||
@ -24,6 +24,7 @@ import Handler.Utils.I18n as Handler.Utils
|
||||
import Handler.Utils.Widgets as Handler.Utils
|
||||
import Handler.Utils.Database as Handler.Utils
|
||||
import Handler.Utils.Occurrences as Handler.Utils
|
||||
import Handler.Utils.Memcached as Handler.Utils
|
||||
|
||||
import Control.Monad.Logger
|
||||
|
||||
|
||||
526
src/Handler/Utils/Memcached.hs
Normal file
526
src/Handler/Utils/Memcached.hs
Normal file
@ -0,0 +1,526 @@
|
||||
module Handler.Utils.Memcached
|
||||
( memcached, memcachedBy
|
||||
, memcachedHere, memcachedByHere
|
||||
, memcachedSet, memcachedGet
|
||||
, memcachedByGet, memcachedBySet
|
||||
, memcachedTimeout, memcachedTimeoutBy
|
||||
, memcachedTimeoutHere, memcachedTimeoutByHere
|
||||
, memcachedLimited, memcachedLimitedKey, memcachedLimitedBy, memcachedLimitedKeyBy
|
||||
, memcachedLimitedHere, memcachedLimitedKeyHere, memcachedLimitedByHere, memcachedLimitedKeyByHere
|
||||
, memcachedLimitedTimeout, memcachedLimitedKeyTimeout, memcachedLimitedTimeoutBy, memcachedLimitedKeyTimeoutBy
|
||||
, memcachedLimitedTimeoutHere, memcachedLimitedKeyTimeoutHere, memcachedLimitedTimeoutByHere, memcachedLimitedKeyTimeoutByHere
|
||||
, Expiry
|
||||
, MemcachedException(..), AsyncTimeoutException(..)
|
||||
) where
|
||||
|
||||
import Import hiding (utc, exp)
|
||||
|
||||
|
||||
import qualified Database.Memcached.Binary.IO as Memcached
|
||||
import Data.Bits (Bits(zeroBits), toIntegralSized)
|
||||
|
||||
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime)
|
||||
|
||||
import qualified Data.Binary as Binary
|
||||
|
||||
import qualified Crypto.MAC.KMAC as KMAC
|
||||
import Crypto.Hash.Algorithms (SHAKE256)
|
||||
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import qualified Data.ByteArray as BA
|
||||
|
||||
import Language.Haskell.TH
|
||||
|
||||
import Data.Typeable (typeRep)
|
||||
import Type.Reflection (typeOf, TypeRep)
|
||||
import qualified Type.Reflection as Refl (typeRep)
|
||||
import Data.Type.Equality (TestEquality(..))
|
||||
|
||||
import qualified Data.HashMap.Strict as HashMap
|
||||
|
||||
import Control.Concurrent.TokenBucket (TokenBucket, newTokenBucket, tokenBucketTryAlloc)
|
||||
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
import Control.Concurrent.STM.Delay
|
||||
|
||||
import qualified Crypto.Saltine.Class as Saltine
|
||||
import qualified Crypto.Saltine.Internal.ByteSizes as Saltine
|
||||
import qualified Crypto.Saltine.Core.AEAD as AEAD
|
||||
|
||||
import qualified Control.Monad.State.Class as State
|
||||
|
||||
|
||||
type Expiry = (Either UTCTime DiffTime)
|
||||
|
||||
_MemcachedExpiry :: Prism' Expiry Memcached.Expiry
|
||||
_MemcachedExpiry = prism' fromExpiry toExpiry
|
||||
where toExpiry (Left utc)
|
||||
| posix > 2592000 = toIntegralSized posix
|
||||
| otherwise = Nothing
|
||||
where posix :: Integer
|
||||
posix = ceiling $ utcTimeToPOSIXSeconds utc
|
||||
toExpiry (Right dTime)
|
||||
| 0 < dTime
|
||||
, dTime <= 2592000
|
||||
= Just $ ceiling dTime
|
||||
| otherwise
|
||||
= Nothing
|
||||
|
||||
fromExpiry n
|
||||
| n <= 2592000
|
||||
= Right $ fromIntegral n
|
||||
| otherwise
|
||||
= Left . posixSecondsToUTCTime $ fromIntegral n
|
||||
|
||||
|
||||
data MemcachedException = MemcachedException Memcached.MemcachedException
|
||||
| MemcachedInvalidExpiry Expiry
|
||||
deriving (Show, Typeable)
|
||||
deriving anyclass (Exception)
|
||||
|
||||
|
||||
memcachedKey :: ( Typeable a
|
||||
, Binary k
|
||||
)
|
||||
=> AEAD.Key -> Proxy a -> k -> ByteString
|
||||
memcachedKey (Saltine.encode -> kmacKey) p k = Binary.encode k
|
||||
& KMAC.finalize . KMAC.updates (KMAC.initialize @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey) . LBS.toChunks
|
||||
& BA.convert
|
||||
|
||||
memcachedByGet :: forall a k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, Typeable a, Binary a
|
||||
, Binary k
|
||||
)
|
||||
=> k -> m (Maybe a)
|
||||
memcachedByGet k = runMaybeT $ do
|
||||
(aeadKey, conn) <- MaybeT $ getsYesod appMemcached
|
||||
let cKey = memcachedKey aeadKey (Proxy @a) k
|
||||
|
||||
encVal <- fmap toStrict . hoist liftIO . catchIfMaybeT Memcached.isItemNotStored $ Memcached.get_ cKey conn
|
||||
|
||||
guard $ length encVal >= Saltine.secretBoxNonce + Saltine.secretBoxMac
|
||||
let (nonceBS, encrypted) = splitAt Saltine.secretBoxNonce encVal
|
||||
nonce <- hoistMaybe $ Saltine.decode nonceBS
|
||||
decrypted <- hoistMaybe $ AEAD.aeadOpen aeadKey nonce encrypted cKey
|
||||
|
||||
case Binary.decodeOrFail $ fromStrict decrypted of
|
||||
Right (unconsumed, _, v)
|
||||
| null unconsumed -> return v
|
||||
_other -> mzero
|
||||
|
||||
memcachedBySet :: forall a k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Binary k
|
||||
)
|
||||
=> Maybe Expiry -> k -> a -> m ()
|
||||
memcachedBySet mExp k v = do
|
||||
mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry
|
||||
mConn <- getsYesod appMemcached
|
||||
for_ mConn $ \(aeadKey, conn) -> do
|
||||
nonce <- liftIO AEAD.newNonce
|
||||
let cKey = memcachedKey aeadKey (Proxy @a) k
|
||||
encVal = Saltine.encode nonce <> AEAD.aead aeadKey nonce (toStrict $ Binary.encode v) cKey
|
||||
liftIO $ Memcached.add zeroBits (fromMaybe zeroBits mExp') cKey (fromStrict encVal) conn
|
||||
|
||||
|
||||
newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a }
|
||||
deriving (Typeable)
|
||||
deriving newtype (Eq, Ord, Show, Binary)
|
||||
|
||||
memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, Typeable a, Binary a
|
||||
)
|
||||
=> m (Maybe a)
|
||||
memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet ()
|
||||
|
||||
memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
)
|
||||
=> Maybe Expiry -> a -> m ()
|
||||
memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed
|
||||
|
||||
|
||||
memcachedWith :: Monad m
|
||||
=> (m (Maybe a), a -> m ()) -> m a -> m a
|
||||
memcachedWith (doGet, doSet) act = do
|
||||
pRes <- doGet
|
||||
maybe id (const . return) pRes $ do
|
||||
res <- act
|
||||
doSet res
|
||||
return res
|
||||
|
||||
memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
)
|
||||
=> Maybe Expiry -> m a -> m a
|
||||
memcached mExp = memcachedWith (memcachedGet, memcachedSet mExp)
|
||||
|
||||
memcachedBy :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Binary k
|
||||
)
|
||||
=> Maybe Expiry -> k -> m a -> m a
|
||||
memcachedBy mExp k = memcachedWith (memcachedByGet k, memcachedBySet mExp k)
|
||||
|
||||
|
||||
newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a }
|
||||
deriving (Typeable)
|
||||
deriving newtype (Eq, Ord, Show, Binary)
|
||||
|
||||
memcachedHere :: Q Exp
|
||||
memcachedHere = do
|
||||
loc <- location
|
||||
[e| \mExp -> fmap unMemcachedUnkeyedLoc . memcachedBy mExp loc . fmap MemcachedUnkeyedLoc |]
|
||||
|
||||
newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a }
|
||||
deriving (Typeable)
|
||||
deriving newtype (Eq, Ord, Show, Binary)
|
||||
|
||||
memcachedByHere :: Q Exp
|
||||
memcachedByHere = do
|
||||
loc <- location
|
||||
[e| \mExp k -> fmap unMemcachedKeyedLoc . memcachedBy mExp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
|
||||
|
||||
data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a
|
||||
|
||||
instance Hashable HashableDynamic where
|
||||
hashWithSalt s (HashableDynamic tRep v) = s `hashWithSalt` tRep `hashWithSalt` v
|
||||
instance Eq HashableDynamic where
|
||||
(HashableDynamic tRep v) == (HashableDynamic tRep' v') = case testEquality tRep tRep' of
|
||||
Just Refl -> v == v'
|
||||
Nothing -> False
|
||||
|
||||
hashableDynamic :: forall a.
|
||||
( Typeable a, Hashable a, Eq a )
|
||||
=> a -> HashableDynamic
|
||||
hashableDynamic v = HashableDynamic (typeOf v) v
|
||||
|
||||
memcachedLimit :: TVar (HashMap HashableDynamic TokenBucket)
|
||||
memcachedLimit = unsafePerformIO . newTVarIO $ HashMap.empty
|
||||
{-# NOINLINE memcachedLimit #-}
|
||||
|
||||
memcachedLimitedWith :: ( MonadIO m
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
)
|
||||
=> (m (Maybe a), a -> m ())
|
||||
-> (m a -> MaybeT m a) -- ^ Wrap execution on cache miss
|
||||
-> k' -- ^ Key for limiting
|
||||
-> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate tokens act = runMaybeT $ do
|
||||
pRes <- lift doGet
|
||||
maybe id (const . return) pRes $ do
|
||||
mBucket <- fmap (HashMap.lookup lK) . liftIO $ readTVarIO memcachedLimit
|
||||
bucket <- case mBucket of
|
||||
Just bucket -> return bucket
|
||||
Nothing -> liftIO $ do
|
||||
bucket <- newTokenBucket
|
||||
atomically $ do
|
||||
hm <- readTVar memcachedLimit
|
||||
let hm' = HashMap.insertWith (flip const) lK bucket hm
|
||||
writeTVar memcachedLimit $! hm'
|
||||
return $ HashMap.lookupDefault (error "could not insert new token bucket") lK hm'
|
||||
guardM . liftIO $ tokenBucketTryAlloc bucket burst rate tokens
|
||||
|
||||
liftAct $ do
|
||||
res <- act
|
||||
doSet res
|
||||
return res
|
||||
|
||||
memcachedLimited :: forall a m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift (Proxy @a) burst rate tokens
|
||||
|
||||
memcachedLimitedKey :: forall a k' m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
)
|
||||
=> k'
|
||||
-> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedGet, memcachedSet mExp) lift lK burst rate tokens
|
||||
|
||||
memcachedLimitedBy :: forall a k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Binary k
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> k
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift (Proxy @a) burst rate tokens
|
||||
|
||||
memcachedLimitedKeyBy :: forall a k' k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
, Binary k
|
||||
)
|
||||
=> k'
|
||||
-> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> k
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedKeyBy lK burst rate tokens mExp k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) lift lK burst rate tokens
|
||||
|
||||
memcachedLimitedHere :: Q Exp
|
||||
memcachedLimitedHere = do
|
||||
loc <- location
|
||||
[e| \burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedBy burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |]
|
||||
|
||||
memcachedLimitedKeyHere :: Q Exp
|
||||
memcachedLimitedKeyHere = do
|
||||
loc <- location
|
||||
[e| \lK burst rate tokens mExp -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp loc . fmap MemcachedUnkeyedLoc |]
|
||||
|
||||
memcachedLimitedByHere :: Q Exp
|
||||
memcachedLimitedByHere = do
|
||||
loc <- location
|
||||
[e| \burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedBy burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
|
||||
memcachedLimitedKeyByHere :: Q Exp
|
||||
memcachedLimitedKeyByHere = do
|
||||
loc <- location
|
||||
[e| \lK burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
|
||||
|
||||
data AsyncTimeoutException = AsyncTimeoutReturnTypeDoesNotMatchComputationKey
|
||||
deriving (Show, Typeable)
|
||||
deriving anyclass (Exception)
|
||||
|
||||
data DynamicAsync = forall a. DynamicAsync !(TypeRep a) !(Async a)
|
||||
|
||||
instance Eq DynamicAsync where
|
||||
(DynamicAsync tRep v) == (DynamicAsync tRep' v') = case testEquality tRep tRep' of
|
||||
Just Refl -> v == v'
|
||||
Nothing -> False
|
||||
|
||||
memcachedAsync :: TVar (HashMap HashableDynamic DynamicAsync)
|
||||
memcachedAsync = unsafePerformIO . newTVarIO $ HashMap.empty
|
||||
{-# NOINLINE memcachedAsync #-}
|
||||
|
||||
liftAsyncTimeout :: forall k'' a m.
|
||||
( MonadResource m, MonadUnliftIO m
|
||||
, MonadThrow m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a
|
||||
)
|
||||
=> DiffTime
|
||||
-> k''
|
||||
-> m a -> MaybeT m a
|
||||
liftAsyncTimeout dt (hashableDynamic -> cK) act = do
|
||||
delay <- liftIO . newDelay . round $ toRational dt / 1e6
|
||||
|
||||
act' <- lift $ do
|
||||
existing <- traverse castDynamicAsync . HashMap.lookup cK <=< liftIO $ readTVarIO memcachedAsync
|
||||
case existing of
|
||||
Just act' -> return act'
|
||||
Nothing -> do
|
||||
startAct <- liftIO newEmptyTMVarIO
|
||||
act' <- allocateLinkedAsync $ do
|
||||
atomically $ takeTMVar startAct
|
||||
act
|
||||
act'' <- atomically $ do
|
||||
hm <- readTVar memcachedAsync
|
||||
let new = DynamicAsync (Refl.typeRep @a) act'
|
||||
go mOld = case mOld of
|
||||
Just old' -> do
|
||||
old <- castDynamicAsync old'
|
||||
resolved <- lift $ is _Just <$> pollSTM old
|
||||
|
||||
if | resolved -> return $ Just new
|
||||
| otherwise -> do
|
||||
State.put old
|
||||
return $ Just old'
|
||||
Nothing -> return $ Just new
|
||||
|
||||
(hm', act'') <- runStateT (HashMap.alterF go cK hm) act'
|
||||
writeTVar memcachedAsync $! hm'
|
||||
return act''
|
||||
if | act' == act'' -> atomically $ putTMVar startAct ()
|
||||
| otherwise -> cancel act'
|
||||
return act''
|
||||
|
||||
MaybeT . atomically $ (Nothing <$ waitDelay delay) <|> (Just <$> waitSTM act')
|
||||
where
|
||||
castDynamicAsync :: forall m'. MonadThrow m' => DynamicAsync -> m' (Async a)
|
||||
castDynamicAsync (DynamicAsync tRep v)
|
||||
| Just Refl <- testEquality tRep (Refl.typeRep @a)
|
||||
= return v
|
||||
| otherwise
|
||||
= throwM AsyncTimeoutReturnTypeDoesNotMatchComputationKey
|
||||
|
||||
memcachedTimeoutWith :: ( MonadResource m, MonadUnliftIO m
|
||||
, MonadThrow m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a
|
||||
)
|
||||
=> (m (Maybe a), a -> m ()) -> DiffTime -> k'' -> m a -> m (Maybe a)
|
||||
memcachedTimeoutWith (doGet, doSet) dt cK act = runMaybeT $ do
|
||||
pRes <- lift doGet
|
||||
maybe id (const . return) pRes $
|
||||
liftAsyncTimeout dt cK $ do
|
||||
res <- act
|
||||
doSet res
|
||||
return res
|
||||
|
||||
memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
)
|
||||
=> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a)
|
||||
memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp)
|
||||
|
||||
memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Binary k
|
||||
)
|
||||
=> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a)
|
||||
memcachedTimeoutBy mExp dt cK k = memcachedTimeoutWith (memcachedByGet k, memcachedBySet mExp k) dt cK
|
||||
|
||||
memcachedTimeoutHere :: Q Exp
|
||||
memcachedTimeoutHere = do
|
||||
loc <- location
|
||||
[e| \mExp dt cK -> fmap unMemcachedUnkeyedLoc . memcachedTimeoutBy mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
|
||||
|
||||
memcachedTimeoutByHere :: Q Exp
|
||||
memcachedTimeoutByHere = do
|
||||
loc <- location
|
||||
[e| \mExp dt cK k -> fmap unMemcachedKeyedLoc . memcachedBy mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
|
||||
memcachedLimitedTimeout :: forall a k'' m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> DiffTime
|
||||
-> k''
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedTimeout burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens
|
||||
|
||||
memcachedLimitedKeyTimeout :: forall a k' k'' m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
)
|
||||
=> k'
|
||||
-> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> DiffTime
|
||||
-> k''
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedKeyTimeout lK burst rate tokens mExp dt cK = memcachedLimitedWith (memcachedGet, memcachedSet mExp) (liftAsyncTimeout dt cK) lK burst rate tokens
|
||||
|
||||
memcachedLimitedTimeoutBy :: forall a k'' k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Binary k
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> DiffTime
|
||||
-> k''
|
||||
-> k
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedTimeoutBy burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) (Proxy @a) burst rate tokens
|
||||
|
||||
memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
, Binary k
|
||||
)
|
||||
=> k'
|
||||
-> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
-> Word64 -- ^ tokens to allocate; corresponds to expected cost of operation to perform
|
||||
-> Maybe Expiry
|
||||
-> DiffTime
|
||||
-> k''
|
||||
-> k
|
||||
-> m a
|
||||
-> m (Maybe a)
|
||||
memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK k = memcachedLimitedWith (memcachedByGet k, memcachedBySet mExp k) (liftAsyncTimeout dt cK) lK burst rate tokens
|
||||
|
||||
memcachedLimitedTimeoutHere :: Q Exp
|
||||
memcachedLimitedTimeoutHere = do
|
||||
loc <- location
|
||||
[e| \burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
|
||||
|
||||
memcachedLimitedKeyTimeoutHere :: Q Exp
|
||||
memcachedLimitedKeyTimeoutHere = do
|
||||
loc <- location
|
||||
[e| \lK burst rate tokens mExp dt cK -> fmap (fmap unMemcachedUnkeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK loc . fmap MemcachedUnkeyedLoc |]
|
||||
|
||||
memcachedLimitedTimeoutByHere :: Q Exp
|
||||
memcachedLimitedTimeoutByHere = do
|
||||
loc <- location
|
||||
[e| \burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedTimeoutBy burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
|
||||
memcachedLimitedKeyTimeoutByHere :: Q Exp
|
||||
memcachedLimitedKeyTimeoutByHere = do
|
||||
loc <- location
|
||||
[e| \lK burst rate tokens mExp dt cK k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyTimeoutBy lK burst rate tokens mExp dt cK (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
|
||||
16
src/Jobs.hs
16
src/Jobs.hs
@ -85,11 +85,9 @@ handleJobs :: ( MonadResource m
|
||||
handleJobs foundation@UniWorX{..}
|
||||
| foundation ^. _appJobWorkers == 0 = return ()
|
||||
| otherwise = do
|
||||
UnliftIO{..} <- askUnliftIO
|
||||
jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> manageJobPool foundation unmask
|
||||
|
||||
jobPoolManager <- allocateLinkedAsyncWithUnmask $ \unmask -> unliftIO $ manageJobPool foundation unmask
|
||||
|
||||
jobCron <- allocateLinkedAsync . unliftIO $ manageCrontab foundation
|
||||
jobCron <- allocateLinkedAsync $ manageCrontab foundation
|
||||
|
||||
let jobWorkers = Map.empty
|
||||
jobWorkerName = const $ error "Unknown worker"
|
||||
@ -121,7 +119,7 @@ manageJobPool :: forall m.
|
||||
, MonadUnliftIO m
|
||||
, MonadMask m
|
||||
)
|
||||
=> UniWorX -> (forall a. IO a -> IO a) -> m ()
|
||||
=> UniWorX -> (forall a. m a -> m a) -> m ()
|
||||
manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
flip runContT return . forever . join . atomically $ asum
|
||||
[ spawnMissingWorkers
|
||||
@ -131,9 +129,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
where
|
||||
shutdownOnException :: m a -> m a
|
||||
shutdownOnException act = do
|
||||
UnliftIO{..} <- askUnliftIO
|
||||
|
||||
actAsync <- allocateLinkedAsyncMasked $ unliftIO act
|
||||
actAsync <- allocateLinkedAsyncMasked act
|
||||
|
||||
let handleExc e = do
|
||||
atomically $ do
|
||||
@ -143,7 +139,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
void $ wait actAsync
|
||||
throwM e
|
||||
|
||||
liftIO (unmask $ wait actAsync) `catchAll` handleExc
|
||||
unmask (wait actAsync) `catchAll` handleExc
|
||||
|
||||
num :: Int
|
||||
num = fromIntegral $ foundation ^. _appJobWorkers
|
||||
@ -178,7 +174,7 @@ manageJobPool foundation@UniWorX{..} unmask = shutdownOnException $
|
||||
$logInfoS logIdent "Started"
|
||||
runConduit $ streamChan .| handleJobs' workerId
|
||||
$logInfoS logIdent "Stopped"
|
||||
worker <- lift . lift $ allocateLinkedAsync runWorker
|
||||
worker <- lift . lift . allocateLinkedAsync $ liftIO runWorker
|
||||
|
||||
tell . Endo $ \cSt -> cSt
|
||||
{ jobWorkers = Map.insert worker chan $ jobWorkers cSt
|
||||
|
||||
@ -59,6 +59,10 @@ dispatchHealthCheckMatchingClusterConfig
|
||||
ourSetting <- getsYesod appClusterID
|
||||
dbSetting <- clusterSetting @'ClusterId
|
||||
return $ Just ourSetting == dbSetting
|
||||
clusterSettingMatches ClusterMemcachedKey = do
|
||||
ourSetting <- getsYesod $ fmap fst . appMemcached
|
||||
dbSetting <- clusterSetting @'ClusterMemcachedKey
|
||||
return $ maybe True ((== dbSetting) . Just) ourSetting
|
||||
|
||||
|
||||
clusterSetting :: forall key.
|
||||
|
||||
@ -158,6 +158,8 @@ data AppSettings = AppSettings
|
||||
|
||||
, appCookieSettings :: RegisteredCookie -> CookieSettings
|
||||
|
||||
, appMemcachedConf :: Maybe MemcachedConf
|
||||
|
||||
, appInitialInstanceID :: Maybe (Either FilePath UUID)
|
||||
, appRibbon :: Maybe Text
|
||||
} deriving Show
|
||||
@ -418,6 +420,7 @@ instance FromJSON AppSettings where
|
||||
]
|
||||
appWidgetMemcachedConf <- assertM validWidgetMemcachedConf <$> o .:? "widget-memcached"
|
||||
appSessionMemcachedConf <- assertM validMemcachedConf <$> o .:? "session-memcached"
|
||||
appMemcachedConf <- assertM validMemcachedConf <$> o .:? "memcached"
|
||||
appRoot <- o .:? "approot"
|
||||
appHost <- fromString <$> o .: "host"
|
||||
appPort <- o .: "port"
|
||||
|
||||
@ -41,6 +41,7 @@ data ClusterSettingsKey
|
||||
| ClusterSecretBoxKey
|
||||
| ClusterJSONWebKeySet
|
||||
| ClusterId
|
||||
| ClusterMemcachedKey
|
||||
deriving (Eq, Ord, Enum, Bounded, Show, Read)
|
||||
|
||||
instance Universe ClusterSettingsKey
|
||||
@ -122,3 +123,9 @@ instance ClusterSetting 'ClusterId where
|
||||
type ClusterSettingValue 'ClusterId = UUID
|
||||
initClusterSetting _ = liftIO getRandom
|
||||
knownClusterSetting _ = ClusterId
|
||||
|
||||
|
||||
instance ClusterSetting 'ClusterMemcachedKey where
|
||||
type ClusterSettingValue 'ClusterMemcachedKey = AEAD.Key
|
||||
initClusterSetting _ = liftIO AEAD.newKey
|
||||
knownClusterSetting _ = ClusterMemcachedKey
|
||||
|
||||
@ -7,33 +7,34 @@ module UnliftIO.Async.Utils
|
||||
import ClassyPrelude hiding (cancel, async, link)
|
||||
import Control.Lens
|
||||
|
||||
import UnliftIO.Async
|
||||
import qualified UnliftIO.Async as UnliftIO
|
||||
import qualified Control.Concurrent.Async as A
|
||||
|
||||
import Control.Monad.Trans.Resource
|
||||
|
||||
|
||||
allocateAsync :: forall m a.
|
||||
MonadResource m
|
||||
=> IO a -> m (Async a)
|
||||
allocateAsync = fmap (view _2) . flip allocate cancel . liftIO . async
|
||||
( MonadUnliftIO m, MonadResource m )
|
||||
=> m a -> m (Async a)
|
||||
allocateAsync act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel . A.async $ run act
|
||||
|
||||
allocateLinkedAsync :: forall m a. (MonadUnliftIO m, MonadResource m) => IO a -> m (Async a)
|
||||
allocateLinkedAsync = uncurry (<$) . (id &&& link) <=< allocateAsync
|
||||
allocateLinkedAsync :: forall m a. (MonadUnliftIO m, MonadResource m) => m a -> m (Async a)
|
||||
allocateLinkedAsync = uncurry (<$) . (id &&& UnliftIO.link) <=< allocateAsync
|
||||
|
||||
|
||||
allocateAsyncWithUnmask :: forall m a.
|
||||
MonadResource m
|
||||
=> ((forall b. IO b -> IO b) -> IO a) -> m (Async a)
|
||||
allocateAsyncWithUnmask act = fmap (view _2) . flip allocate cancel . liftIO $ asyncWithUnmask act
|
||||
( MonadUnliftIO m, MonadResource m )
|
||||
=> ((forall b. m b -> m b) -> m a) -> m (Async a)
|
||||
allocateAsyncWithUnmask act = withRunInIO $ \run -> run . fmap (view _2) . flip allocate A.cancel $ A.asyncWithUnmask $ \unmask -> run $ act (liftIO . unmask . run)
|
||||
|
||||
allocateLinkedAsyncWithUnmask :: forall m a. (MonadUnliftIO m, MonadResource m) => ((forall b. IO b -> IO b) -> IO a) -> m (Async a)
|
||||
allocateLinkedAsyncWithUnmask act = uncurry (<$) . (id &&& link) =<< allocateAsyncWithUnmask act
|
||||
allocateLinkedAsyncWithUnmask :: forall m a. (MonadUnliftIO m, MonadResource m) => ((forall b. m b -> m b) -> m a) -> m (Async a)
|
||||
allocateLinkedAsyncWithUnmask act = uncurry (<$) . (id &&& UnliftIO.link) =<< allocateAsyncWithUnmask act
|
||||
|
||||
|
||||
allocateAsyncMasked :: forall m a.
|
||||
MonadResource m
|
||||
=> IO a -> m (Async a)
|
||||
allocateAsyncMasked act = fmap (view _2) . flip allocate cancel . liftIO $ asyncWithUnmask (const act)
|
||||
( MonadUnliftIO m, MonadResource m )
|
||||
=> m a -> m (Async a)
|
||||
allocateAsyncMasked act = allocateAsyncWithUnmask (const act)
|
||||
|
||||
allocateLinkedAsyncMasked :: forall m a. (MonadUnliftIO m, MonadResource m) => IO a -> m (Async a)
|
||||
allocateLinkedAsyncMasked = uncurry (<$) . (id &&& link) <=< allocateAsyncMasked
|
||||
allocateLinkedAsyncMasked :: forall m a. (MonadUnliftIO m, MonadResource m) => m a -> m (Async a)
|
||||
allocateLinkedAsyncMasked act = allocateLinkedAsyncWithUnmask (const act)
|
||||
|
||||
@ -104,5 +104,7 @@ extra-deps:
|
||||
|
||||
- unidecode-0.1.0.4
|
||||
|
||||
- token-bucket-0.1.0.1
|
||||
|
||||
resolver: lts-15.0
|
||||
allow-newer: true
|
||||
|
||||
@ -281,6 +281,13 @@ packages:
|
||||
sha256: 4959068a0caf410dd4b8046f0b0138e3cf6471abb0cc865c9993db3b2930d283
|
||||
original:
|
||||
hackage: unidecode-0.1.0.4
|
||||
- completed:
|
||||
hackage: token-bucket-0.1.0.1@sha256:d8e85f2fc373939975e7ace7907baee177531ab6e43df94e330a2357e64a2d11,1899
|
||||
pantry-tree:
|
||||
size: 399
|
||||
sha256: b0b4a08ea1bf76bd108310f64d7f80e0f30b61ddc3d71f6cab7bdce329d2c1fa
|
||||
original:
|
||||
hackage: token-bucket-0.1.0.1
|
||||
snapshots:
|
||||
- completed:
|
||||
size: 488576
|
||||
|
||||
Loading…
Reference in New Issue
Block a user