feat: additional general purpose caching tier (memcachedLocal)
This commit is contained in:
parent
d65bd6889b
commit
939ab37588
@ -157,6 +157,10 @@ memcached:
|
||||
limit: "_env:MEMCACHED_LIMIT:1024"
|
||||
timeout: "_env:MEMCACHED_TIMEOUT:20"
|
||||
expiration: "_env:MEMCACHED_EXPIRATION:300"
|
||||
memcache-auth: true
|
||||
memcached-local:
|
||||
maximum-ghost: 512
|
||||
maximum-weight: 104857600 # 100MiB
|
||||
|
||||
upload-cache:
|
||||
host: "_env:UPLOAD_S3_HOST:"
|
||||
@ -269,8 +273,6 @@ fallback-personalised-sheet-files-keys-expire: 2419200
|
||||
|
||||
download-token-expire: 604801
|
||||
|
||||
memcache-auth: true
|
||||
|
||||
file-source-arc:
|
||||
maximum-ghost: 512
|
||||
maximum-weight: 1073741824 # 1GiB
|
||||
|
||||
@ -112,6 +112,9 @@ import qualified Data.IntervalMap.Strict as IntervalMap
|
||||
|
||||
import qualified Utils.Pool as Custom
|
||||
|
||||
import Utils.Postgresql
|
||||
import Handler.Utils.Memcached (manageMemcachedLocalInvalidations)
|
||||
|
||||
-- Import all relevant handler modules here.
|
||||
-- (HPack takes care to add new modules to our cabal file nowadays.)
|
||||
import Handler.News
|
||||
@ -208,7 +211,7 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
-- from there, and then create the real foundation.
|
||||
let
|
||||
mkFoundation :: _ -> (forall m. MonadIO m => Custom.Pool' m DBConnLabel DBConnUseState SqlBackend) -> _
|
||||
mkFoundation appSettings' appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey = UniWorX {..}
|
||||
mkFoundation appSettings' appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appMemcachedLocal appUploadCache appVerpSecret appAuthKey = UniWorX {..}
|
||||
-- The UniWorX {..} syntax is an example of record wild cards. For more
|
||||
-- information, see:
|
||||
-- https://ocharles.org.uk/blog/posts/2014-12-04-record-wildcards.html
|
||||
@ -224,6 +227,7 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
(error "JSONWebKeySet forced in tempFoundation")
|
||||
(error "ClusterID forced in tempFoundation")
|
||||
(error "memcached forced in tempFoundation")
|
||||
(error "memcachedLocal forced in tempFoundation")
|
||||
(error "MinioConn forced in tempFoundation")
|
||||
(error "VerpSecret forced in tempFoundation")
|
||||
(error "AuthKey forced in tempFoundation")
|
||||
@ -295,11 +299,17 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
appMemcached <- for appMemcachedConf $ \memcachedConf -> do
|
||||
$logDebugS "setup" "Memcached"
|
||||
memcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterMemcachedKey) `customRunSqlPool` sqlPool
|
||||
memcached <- createMemcached memcachedConf
|
||||
memcachedConn <- createMemcached memcachedConf
|
||||
when appClearCache $ do
|
||||
$logWarnS "setup" "Clearing memcached"
|
||||
liftIO $ Memcached.flushAll memcached
|
||||
return (memcachedKey, memcached)
|
||||
liftIO $ Memcached.flushAll memcachedConn
|
||||
return AppMemcached{..}
|
||||
appMemcachedLocal <- for appMemcachedLocalConf $ \ARCConf{..} -> do
|
||||
memcachedLocalARC <- initARCHandle arccMaximumGhost arccMaximumWeight
|
||||
void . Prometheus.register $ arcMetrics ARCMemcachedLocal memcachedLocalARC
|
||||
memcachedLocalInvalidationQueue <- newTVarIO mempty
|
||||
memcachedLocalHandleInvalidations <- allocateLinkedAsync . managePostgresqlChannel appDatabaseConf ChannelMemcachedLocalInvalidation $ manageMemcachedLocalInvalidations memcachedLocalARC memcachedLocalInvalidationQueue
|
||||
return AppMemcachedLocal{..}
|
||||
|
||||
appSessionStore <- mkSessionStore appSettings'' sqlPool `customRunSqlPool` sqlPool
|
||||
|
||||
@ -314,7 +324,7 @@ makeFoundation appSettings''@AppSettings{..} = do
|
||||
|
||||
$logDebugS "Runtime configuration" $ tshow appSettings'
|
||||
|
||||
let foundation = mkFoundation appSettings' sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey
|
||||
let foundation = mkFoundation appSettings' sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appMemcachedLocal appUploadCache appVerpSecret appAuthKey
|
||||
|
||||
-- Return the foundation
|
||||
$logDebugS "setup" "Done"
|
||||
@ -671,7 +681,7 @@ shutdownApp app = do
|
||||
for_ (appSmtpPool app) destroyAllResources
|
||||
for_ (appLdapPool app) . mapFailover $ views _2 destroyAllResources
|
||||
for_ (appWidgetMemcached app) Memcached.close
|
||||
for_ (appMemcached app) $ views _2 Memcached.close
|
||||
for_ (appMemcached app) $ views _memcachedConn Memcached.close
|
||||
release . fst $ appLogger app
|
||||
|
||||
liftIO $ threadDelay 1e6
|
||||
|
||||
@ -24,3 +24,4 @@ instance BackendCompatible SqlWriteBackend SqlBackend where
|
||||
projectBackend = SqlWriteBackend
|
||||
|
||||
deriving newtype instance Binary (BackendKey SqlBackend)
|
||||
deriving anyclass instance NFData (BackendKey SqlBackend)
|
||||
|
||||
@ -157,7 +157,7 @@ instance (MonadHandler m, HandlerSite m ~ UniWorX, BackendCompatible SqlReadBack
|
||||
-- cacheAP mExp k mkV cont = APBind $ \mAuthId route isWrite -> either (return . Left) (fmap Right) . cont mAuthId route isWrite =<< memcachedBy mExp k mkV
|
||||
|
||||
cacheAPDB :: ( Binary k
|
||||
, Typeable v, Binary v
|
||||
, Typeable v, Binary v, NFData v
|
||||
)
|
||||
=> Maybe Expiry
|
||||
-> k
|
||||
@ -185,7 +185,7 @@ cacheAPDB mExp k mkV cont = APBindDB $ \mAuthId route isWrite -> do
|
||||
-- Nothing -> either (return . Left) (fmap Right) $ cont mAuthId route isWrite Nothing
|
||||
|
||||
cacheAPDB' :: ( Binary k
|
||||
, Typeable v, Binary v
|
||||
, Typeable v, Binary v, NFData v
|
||||
)
|
||||
=> Maybe Expiry
|
||||
-> (Maybe (AuthId UniWorX) -> Route UniWorX -> Bool -> Maybe (k, ReaderT SqlReadBackend (HandlerFor UniWorX) v))
|
||||
|
||||
@ -5,6 +5,10 @@ module Foundation.Type
|
||||
( UniWorX(..)
|
||||
, SomeSessionStorage(..)
|
||||
, _SessionStorageMemcachedSql, _SessionStorageAcid
|
||||
, AppMemcached(..)
|
||||
, _memcachedKey, _memcachedConn
|
||||
, AppMemcachedLocal(..)
|
||||
, _memcachedLocalARC
|
||||
, SMTPPool
|
||||
, _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport, _appMemcached, _appUploadCache, _appVerpSecret, _appAuthKey
|
||||
, DB, Form, MsgRenderer, MailM, DBFile
|
||||
@ -30,15 +34,35 @@ import qualified Utils.Pool as Custom
|
||||
|
||||
import Utils.Metrics (DBConnUseState)
|
||||
|
||||
import qualified Data.ByteString.Lazy as Lazy
|
||||
import Data.Time.Clock.POSIX (POSIXTime)
|
||||
import GHC.Fingerprint (Fingerprint)
|
||||
|
||||
|
||||
type SMTPPool = Pool SMTPConnection
|
||||
|
||||
data SomeSessionStorage
|
||||
= SessionStorageMemcachedSql { sessionStorageMemcachedSql :: MemcachedSqlStorage SessionMap }
|
||||
| SessionStorageAcid { sessionStorageAcid :: AcidStorage SessionMap }
|
||||
deriving (Generic, Typeable)
|
||||
|
||||
makePrisms ''SomeSessionStorage
|
||||
|
||||
data AppMemcached = AppMemcached
|
||||
{ memcachedKey :: AEAD.Key
|
||||
, memcachedConn :: Memcached.Connection
|
||||
} deriving (Generic, Typeable)
|
||||
|
||||
makeLenses_ ''AppMemcached
|
||||
|
||||
data AppMemcachedLocal = AppMemcachedLocal
|
||||
{ memcachedLocalARC :: ARCHandle (Fingerprint, Lazy.ByteString) Int (NFDynamic, Maybe POSIXTime)
|
||||
, memcachedLocalHandleInvalidations :: Async ()
|
||||
, memcachedLocalInvalidationQueue :: TVar (Seq (Fingerprint, Lazy.ByteString))
|
||||
} deriving (Generic, Typeable)
|
||||
|
||||
makeLenses_ ''AppMemcachedLocal
|
||||
|
||||
-- | The foundation datatype for your application. This can be a good place to
|
||||
-- keep settings and values requiring initialization before your application
|
||||
-- starts running, such as database connections. Every handler will have
|
||||
@ -61,14 +85,15 @@ data UniWorX = UniWorX
|
||||
, appSecretBoxKey :: SecretBox.Key
|
||||
, appJSONWebKeySet :: Jose.JwkSet
|
||||
, appHealthReport :: TVar (Set (UTCTime, HealthReport))
|
||||
, appMemcached :: Maybe (AEAD.Key, Memcached.Connection)
|
||||
, appMemcached :: Maybe AppMemcached
|
||||
, appMemcachedLocal :: Maybe AppMemcachedLocal
|
||||
, appUploadCache :: Maybe MinioConn
|
||||
, appVerpSecret :: VerpSecret
|
||||
, appAuthKey :: Auth.Key
|
||||
, appFileSourceARC :: Maybe (ARCHandle (FileContentChunkReference, (Int, Int)) Int ByteString)
|
||||
, appFileSourcePrewarm :: Maybe (LRUHandle (FileContentChunkReference, (Int, Int)) UTCTime Int ByteString)
|
||||
, appFileInjectInhibit :: TVar (IntervalMap UTCTime (Set FileContentReference))
|
||||
}
|
||||
} deriving (Typeable)
|
||||
|
||||
makeLenses_ ''UniWorX
|
||||
instance HasInstanceID UniWorX InstanceId where
|
||||
|
||||
@ -22,7 +22,7 @@ import Handler.Utils.I18n as Handler.Utils
|
||||
import Handler.Utils.Widgets as Handler.Utils
|
||||
import Handler.Utils.Database as Handler.Utils
|
||||
import Handler.Utils.Occurrences as Handler.Utils
|
||||
import Handler.Utils.Memcached as Handler.Utils
|
||||
import Handler.Utils.Memcached as Handler.Utils hiding (manageMemcachedLocalInvalidations)
|
||||
import Handler.Utils.Files as Handler.Utils
|
||||
import Handler.Utils.Download as Handler.Utils
|
||||
|
||||
|
||||
@ -1,9 +1,12 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
|
||||
module Handler.Utils.Memcached
|
||||
( memcachedAvailable
|
||||
, memcached, memcachedBy
|
||||
, memcachedHere, memcachedByHere
|
||||
, memcachedSet, memcachedGet
|
||||
, memcachedInvalidate, memcachedByInvalidate
|
||||
, manageMemcachedLocalInvalidations
|
||||
, memcachedByGet, memcachedBySet
|
||||
, memcachedTimeout, memcachedTimeoutBy
|
||||
, memcachedTimeoutHere, memcachedTimeoutByHere
|
||||
@ -35,9 +38,11 @@ import Crypto.Hash.Algorithms (SHAKE256)
|
||||
|
||||
import qualified Data.ByteArray as BA
|
||||
|
||||
import qualified Data.ByteString.Base64 as Base64
|
||||
|
||||
import Language.Haskell.TH hiding (Type)
|
||||
|
||||
import Data.Typeable (typeRep)
|
||||
import Data.Typeable (typeRep, typeRepFingerprint)
|
||||
import Type.Reflection (typeOf, TypeRep)
|
||||
import qualified Type.Reflection as Refl (typeRep)
|
||||
import Data.Type.Equality (TestEquality(..))
|
||||
@ -58,6 +63,12 @@ import qualified Control.Monad.State.Class as State
|
||||
|
||||
import qualified Data.ByteString.Lazy as Lazy (ByteString)
|
||||
|
||||
import GHC.Fingerprint
|
||||
|
||||
import Utils.Postgresql
|
||||
|
||||
import UnliftIO.Concurrent (threadDelay)
|
||||
|
||||
|
||||
type Expiry = Either UTCTime DiffTime
|
||||
|
||||
@ -112,7 +123,7 @@ putMemcachedValue MemcachedValue{..} = do
|
||||
putExpiry mExpiry
|
||||
Binary.putByteString mCiphertext
|
||||
|
||||
getMemcachedValue :: Binary.Get MemcachedValue
|
||||
getMemcachedValue, getMemcachedValueNoExpiry :: Binary.Get MemcachedValue
|
||||
getMemcachedValue = do
|
||||
Binary.lookAhead . Binary.label "length check" $ do
|
||||
void . Binary.getByteString $ Saltine.secretBoxNonce + 4 + Saltine.secretBoxMac
|
||||
@ -120,8 +131,6 @@ getMemcachedValue = do
|
||||
mExpiry <- getExpiry
|
||||
mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString
|
||||
return MemcachedValue{..}
|
||||
|
||||
getMemcachedValueNoExpiry :: Binary.Get MemcachedValue
|
||||
getMemcachedValueNoExpiry = do
|
||||
Binary.lookAhead . Binary.label "length check" $ do
|
||||
void . Binary.getByteString $ Saltine.secretBoxNonce + 4 + Saltine.secretBoxMac
|
||||
@ -130,7 +139,6 @@ getMemcachedValueNoExpiry = do
|
||||
mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString
|
||||
return MemcachedValue{..}
|
||||
|
||||
|
||||
memcachedAvailable :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
)
|
||||
=> m Bool
|
||||
@ -143,9 +151,9 @@ data MemcachedException = MemcachedException Memcached.MemcachedException
|
||||
deriving anyclass (Exception)
|
||||
|
||||
|
||||
memcachedKey :: Typeable a
|
||||
toMemcachedKey :: Typeable a
|
||||
=> AEAD.Key -> Proxy a -> Lazy.ByteString -> ByteString
|
||||
memcachedKey (Saltine.encode -> kmacKey) p = BA.convert . kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey
|
||||
toMemcachedKey (Saltine.encode -> kmacKey) p = BA.convert . kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey
|
||||
|
||||
memcachedAAD :: ByteString -> Maybe POSIXTime -> ByteString
|
||||
memcachedAAD cKey mExpiry = toStrict . Binary.runPut $ do
|
||||
@ -154,18 +162,30 @@ memcachedAAD cKey mExpiry = toStrict . Binary.runPut $ do
|
||||
|
||||
memcachedByGet :: forall a k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> k -> m (Maybe a)
|
||||
memcachedByGet (Binary.encode -> k) = runMaybeT $ requestCache <|> memcache
|
||||
memcachedByGet (Binary.encode -> k) = runMaybeT $ arc <|> memcache
|
||||
where
|
||||
requestCache = MaybeT . cacheByGet $ toStrict k
|
||||
arc = do
|
||||
AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal
|
||||
res <- hoistMaybe . preview (_1 . _NFDynamic) <=< hoistMaybe <=< cachedARC' memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) $ \mPrev -> do
|
||||
prev@((_, prevExpiry), _) <- hoistMaybe mPrev
|
||||
$logDebugS "memcached" "Cache hit (local ARC)"
|
||||
lift . runMaybeT $ do -- To delete from ARC upon expiry
|
||||
for_ prevExpiry $ \expiry -> do
|
||||
now <- liftIO getPOSIXTime
|
||||
guard $ expiry > now
|
||||
return prev
|
||||
$logDebugS "memcached" "All valid (local ARC)"
|
||||
return res
|
||||
memcache = do
|
||||
(aeadKey, conn) <- MaybeT $ getsYesod appMemcached
|
||||
let cKey = memcachedKey aeadKey (Proxy @a) k
|
||||
AppMemcached{..} <- MaybeT $ getsYesod appMemcached
|
||||
localARC <- getsYesod appMemcachedLocal
|
||||
let cKey = toMemcachedKey memcachedKey (Proxy @a) k
|
||||
|
||||
encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey conn
|
||||
encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey memcachedConn
|
||||
|
||||
$logDebugS "memcached" "Cache hit"
|
||||
|
||||
@ -177,11 +197,20 @@ memcachedByGet (Binary.encode -> k) = runMaybeT $ requestCache <|> memcache
|
||||
guard $ expiry > now + clockLeniency
|
||||
$logDebugS "memcached" $ "Expiry valid: " <> tshow mExpiry
|
||||
let aad = memcachedAAD cKey mExpiry
|
||||
decrypted <- hoistMaybe $ AEAD.aeadOpen aeadKey mNonce mCiphertext aad
|
||||
decrypted <- hoistMaybe $ AEAD.aeadOpen memcachedKey mNonce mCiphertext aad
|
||||
|
||||
$logDebugS "memcached" $ "Decryption valid " <> bool "without" "with" doExp <> " expiration"
|
||||
|
||||
let withCache = case localARC of
|
||||
Just AppMemcachedLocal{..} -> cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k)
|
||||
Nothing -> fmap (view _1) . ($ Nothing)
|
||||
res <- hoistMaybe . preview (_1 . _NFDynamic) <=< withCache $ \case
|
||||
Nothing -> fmap ((, length decrypted) . (, mExpiry) . review (_NFDynamic @a)) . hoistMaybe $ runGetMaybe Binary.get decrypted
|
||||
Just p -> return p
|
||||
|
||||
hoistMaybe $ runGetMaybe Binary.get decrypted
|
||||
$logDebugS "memcached" "All valid"
|
||||
|
||||
return res
|
||||
|
||||
withExp True <|> withExp False
|
||||
where
|
||||
@ -194,50 +223,112 @@ memcachedByGet (Binary.encode -> k) = runMaybeT $ requestCache <|> memcache
|
||||
memcachedBySet :: forall a k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> Maybe Expiry -> k -> a -> m ()
|
||||
memcachedBySet mExp (Binary.encode -> k) v = do
|
||||
mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry
|
||||
|
||||
let decrypted = toStrict $ Binary.encode v
|
||||
mExpiry <- for mExp $ \case
|
||||
Left uTime -> return $ utcTimeToPOSIXSeconds uTime
|
||||
Right diff -> liftIO $ (+ realToFrac diff) <$> getPOSIXTime
|
||||
|
||||
mConn <- getsYesod appMemcached
|
||||
for_ mConn $ \(aeadKey, conn) -> do
|
||||
for_ mConn $ \AppMemcached{..} -> do
|
||||
mNonce <- liftIO AEAD.newNonce
|
||||
mExpiry <- for mExp $ \case
|
||||
Left uTime -> return $ utcTimeToPOSIXSeconds uTime
|
||||
Right diff -> liftIO $ (+ realToFrac diff) <$> getPOSIXTime
|
||||
let cKey = memcachedKey aeadKey (Proxy @a) k
|
||||
let cKey = toMemcachedKey memcachedKey (Proxy @a) k
|
||||
aad = memcachedAAD cKey mExpiry
|
||||
mCiphertext = AEAD.aead aeadKey mNonce (toStrict $ Binary.encode v) aad
|
||||
liftIO $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (Binary.runPut $ putMemcachedValue MemcachedValue{..}) conn
|
||||
cacheBySet (toStrict k) v
|
||||
mCiphertext = AEAD.aead memcachedKey mNonce decrypted aad
|
||||
liftIO $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (Binary.runPut $ putMemcachedValue MemcachedValue{..}) memcachedConn
|
||||
$logDebugS "memcached" $ "Cache store: " <> tshow mExpiry
|
||||
|
||||
mLocal <- getsYesod appMemcachedLocal
|
||||
for_ mLocal $ \AppMemcachedLocal{..} -> do
|
||||
void . cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) . const $ return ((_NFDynamic # v, mExpiry), length decrypted)
|
||||
$logDebugS "memcached" $ "Cache store: " <> tshow mExpiry <> " (local ARC)"
|
||||
-- DEBUG
|
||||
let inv = Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..}
|
||||
where mLocalInvalidateType = typeRepFingerprint . typeRep $ Proxy @a
|
||||
mLocalInvalidateKey = k
|
||||
$logDebugS "memcached" $ "To invalidate remotely: " <> tshow inv
|
||||
|
||||
memcachedByInvalidate :: forall a k m p.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, Typeable a
|
||||
, Binary k
|
||||
)
|
||||
=> k -> p a -> m ()
|
||||
memcachedByInvalidate (Binary.encode -> k) _ = maybeT_ $ do
|
||||
(aeadKey, conn) <- MaybeT $ getsYesod appMemcached
|
||||
let cKey = memcachedKey aeadKey (Proxy @a) k
|
||||
hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey conn
|
||||
memcachedByInvalidate (Binary.encode -> k) _ = arc >> memcache
|
||||
where
|
||||
memcache = maybeT_ $ do
|
||||
AppMemcached{..} <- MaybeT $ getsYesod appMemcached
|
||||
let cKey = toMemcachedKey memcachedKey (Proxy @a) k
|
||||
hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey memcachedConn
|
||||
$logDebugS "memcached" "Cache invalidation"
|
||||
arc = maybeT_ $ do
|
||||
AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal
|
||||
let arcKey = (typeRepFingerprint . typeRep $ Proxy @a, k)
|
||||
atomically $ modifyTVar' memcachedLocalInvalidationQueue (:> arcKey)
|
||||
void . cachedARC' memcachedLocalARC arcKey . const $ return Nothing
|
||||
$logDebugS "memcached" "Cache invalidation (local ARC)"
|
||||
|
||||
data MemcachedLocalInvalidateMsg = MemcachedLocalInvalidateMsg
|
||||
{ mLocalInvalidateType :: Fingerprint
|
||||
, mLocalInvalidateKey :: Lazy.ByteString
|
||||
} deriving (Eq, Ord, Show, Typeable)
|
||||
|
||||
instance Binary MemcachedLocalInvalidateMsg where
|
||||
get = Binary.label "MemcachedLocalInvalidateMsg" $ do
|
||||
mLocalInvalidateType <- Binary.label "mLocalInvalidateType" $ Fingerprint <$> Binary.getWord64le <*> Binary.getWord64le
|
||||
mLocalInvalidateKey <- Binary.label "mLocalInvalidateKey" Binary.getRemainingLazyByteString
|
||||
return MemcachedLocalInvalidateMsg{..}
|
||||
put MemcachedLocalInvalidateMsg{..} = do
|
||||
let Fingerprint w1 w2 = mLocalInvalidateType
|
||||
Binary.putWord64le w1
|
||||
Binary.putWord64le w2
|
||||
Binary.putLazyByteString mLocalInvalidateKey
|
||||
|
||||
manageMemcachedLocalInvalidations :: ( MonadUnliftIO m
|
||||
, MonadLogger m
|
||||
)
|
||||
=> ARCHandle (Fingerprint, Lazy.ByteString) Int (NFDynamic, Maybe POSIXTime)
|
||||
-> TVar (Seq (Fingerprint, Lazy.ByteString))
|
||||
-> PostgresqlChannelManager m ()
|
||||
manageMemcachedLocalInvalidations localARC iQueue = PostgresqlChannelManager
|
||||
{ pgcTerminate = forever $ threadDelay maxBound
|
||||
, pgcOnInput = Just $ \inpBS -> case Binary.runGetOrFail Binary.get . fromStrict <$> Base64.decode inpBS of
|
||||
Right (Right (bs', _, MemcachedLocalInvalidateMsg{..})) | null bs' ->
|
||||
void . cachedARC' localARC (mLocalInvalidateType, mLocalInvalidateKey) $ \mPrev -> do
|
||||
$logDebugS "memcached" $ "Remote invalidation in local ARC: " <> bool "miss" "hit" (is _Just mPrev)
|
||||
return Nothing
|
||||
_other -> $logErrorS "memcached" $ "Received unparseable remote invalidation: " <> tshow inpBS
|
||||
, pgcGenOutput = atomically $ do
|
||||
iQueue' <- readTVar iQueue
|
||||
i <- case iQueue' of
|
||||
i :< is' -> i <$ writeTVar iQueue is'
|
||||
_other -> mzero
|
||||
let (mLocalInvalidateType, mLocalInvalidateKey) = i
|
||||
return . Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..}
|
||||
}
|
||||
|
||||
|
||||
newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a }
|
||||
deriving (Typeable)
|
||||
deriving newtype (Eq, Ord, Show, Binary)
|
||||
instance NFData a => NFData (MemcachedUnkeyed a) where
|
||||
rnf = rnf . unMemcachedUnkeyed
|
||||
|
||||
memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
)
|
||||
=> m (Maybe a)
|
||||
memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet ()
|
||||
|
||||
memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
)
|
||||
=> Maybe Expiry -> a -> m ()
|
||||
memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed
|
||||
@ -260,14 +351,14 @@ memcachedWith (doGet, doSet) act = do
|
||||
|
||||
memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
)
|
||||
=> Maybe Expiry -> m a -> m a
|
||||
memcached mExp = memcachedWith (memcachedGet, \x -> x <$ memcachedSet mExp x)
|
||||
|
||||
memcachedBy :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> Maybe Expiry -> k -> m a -> m a
|
||||
@ -277,6 +368,8 @@ memcachedBy mExp k = memcachedWith (memcachedByGet k, \x -> x <$ memcachedBySet
|
||||
newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a }
|
||||
deriving (Typeable)
|
||||
deriving newtype (Eq, Ord, Show, Binary)
|
||||
instance NFData a => NFData (MemcachedUnkeyedLoc a) where
|
||||
rnf MemcachedUnkeyedLoc{..} = rnf unMemcachedUnkeyedLoc
|
||||
|
||||
memcachedHere :: Q Exp
|
||||
memcachedHere = do
|
||||
@ -286,11 +379,21 @@ memcachedHere = do
|
||||
newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a }
|
||||
deriving (Typeable)
|
||||
deriving newtype (Eq, Ord, Show, Binary)
|
||||
instance NFData a => NFData (MemcachedKeyedLoc a) where
|
||||
rnf MemcachedKeyedLoc{..} = rnf unMemcachedKeyedLoc
|
||||
|
||||
withMemcachedKeyedLoc :: Functor f => (f (MemcachedKeyedLoc a) -> f (MemcachedKeyedLoc a)) -> (f a -> f a)
|
||||
withMemcachedKeyedLoc act = fmap unMemcachedKeyedLoc . act . fmap MemcachedKeyedLoc
|
||||
{-# INLINE withMemcachedKeyedLoc #-}
|
||||
|
||||
withMemcachedKeyedLoc' :: (Functor f, Functor f') => (f (MemcachedKeyedLoc a) -> f (f' (MemcachedKeyedLoc a))) -> (f a -> f (f' a))
|
||||
withMemcachedKeyedLoc' act = fmap (fmap unMemcachedKeyedLoc) . act . fmap MemcachedKeyedLoc
|
||||
{-# INLINE withMemcachedKeyedLoc' #-}
|
||||
|
||||
memcachedByHere :: Q Exp
|
||||
memcachedByHere = do
|
||||
loc <- location
|
||||
[e| \mExp k -> fmap unMemcachedKeyedLoc . memcachedBy mExp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
[e| \mExp k -> withMemcachedKeyedLoc (memcachedBy mExp (loc, k)) |]
|
||||
|
||||
|
||||
data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a
|
||||
@ -348,7 +451,7 @@ memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate t
|
||||
memcachedLimited :: forall a m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
@ -361,7 +464,7 @@ memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, me
|
||||
memcachedLimitedKey :: forall a k' m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
)
|
||||
=> k'
|
||||
@ -376,7 +479,7 @@ memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedG
|
||||
memcachedLimitedBy :: forall a k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
@ -391,7 +494,7 @@ memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByG
|
||||
memcachedLimitedKeyBy :: forall a k' k m.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
, Binary k
|
||||
)
|
||||
@ -418,18 +521,18 @@ memcachedLimitedKeyHere = do
|
||||
memcachedLimitedByHere :: Q Exp
|
||||
memcachedLimitedByHere = do
|
||||
loc <- location
|
||||
[e| \burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedBy burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
[e| \burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedBy burst rate tokens mExp (loc, k)) |]
|
||||
|
||||
memcachedLimitedKeyByHere :: Q Exp
|
||||
memcachedLimitedKeyByHere = do
|
||||
loc <- location
|
||||
[e| \lK burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
[e| \lK burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k)) |]
|
||||
|
||||
|
||||
memcacheAuth :: forall m k a.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> k
|
||||
@ -450,7 +553,7 @@ memcacheAuth k mx = cachedByBinary k $ do
|
||||
memcacheAuth' :: forall m k a.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> Expiry
|
||||
@ -462,7 +565,7 @@ memcacheAuth' exp k = memcacheAuth k . (<* tell (Just $ Min exp)) . lift
|
||||
memcacheAuthMax :: forall m k a.
|
||||
( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> Expiry
|
||||
@ -474,17 +577,17 @@ memcacheAuthMax exp k = memcacheAuth k . (tell (Just $ Min exp) *>)
|
||||
memcacheAuthHere :: Q Exp
|
||||
memcacheAuthHere = do
|
||||
loc <- location
|
||||
[e| \k -> fmap unMemcachedKeyedLoc . memcacheAuth (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
[e| \k -> withMemcachedKeyedLoc (memcacheAuth (loc, k)) |]
|
||||
|
||||
memcacheAuthHere' :: Q Exp
|
||||
memcacheAuthHere' = do
|
||||
loc <- location
|
||||
[e| \exp k -> fmap unMemcachedKeyedLoc . memcacheAuth' exp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
[e| \exp k -> withMemcachedKeyedLoc (memcacheAuth' exp (loc, k)) |]
|
||||
|
||||
memcacheAuthHereMax :: Q Exp
|
||||
memcacheAuthHereMax = do
|
||||
loc <- location
|
||||
[e| \exp k -> fmap unMemcachedKeyedLoc . memcacheAuthMax exp (loc, k) . fmap MemcachedKeyedLoc |]
|
||||
[e| \exp k -> withMemcachedKeyedLoc (memcacheAuthMax exp (loc, k)) |]
|
||||
|
||||
|
||||
|
||||
@ -576,7 +679,7 @@ memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
)
|
||||
=> Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a)
|
||||
memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp)
|
||||
@ -585,7 +688,7 @@ memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a)
|
||||
@ -606,7 +709,7 @@ memcachedLimitedTimeout :: forall a k'' m.
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
-> Word64 -- ^ avg. inverse rate (usec/token)
|
||||
@ -623,7 +726,7 @@ memcachedLimitedKeyTimeout :: forall a k' k'' m.
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
)
|
||||
=> k'
|
||||
@ -642,7 +745,7 @@ memcachedLimitedTimeoutBy :: forall a k'' k m.
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Binary k
|
||||
)
|
||||
=> Word64 -- ^ burst-size (tokens)
|
||||
@ -661,7 +764,7 @@ memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m.
|
||||
, MonadThrow m
|
||||
, MonadUnliftIO m
|
||||
, Typeable k'', Hashable k'', Eq k''
|
||||
, Typeable a, Binary a
|
||||
, Typeable a, Binary a, NFData a
|
||||
, Typeable k', Hashable k', Eq k'
|
||||
, Binary k
|
||||
)
|
||||
|
||||
@ -75,6 +75,9 @@ import Data.Monoid as Import (Last(..), First(..), Any(..), All(..),
|
||||
import Data.Binary as Import (Binary)
|
||||
import Data.Binary.Instances as Import ()
|
||||
|
||||
import Data.Dynamic as Import (Dynamic)
|
||||
import Data.Dynamic.Lens as Import
|
||||
|
||||
import System.FilePath as Import hiding (joinPath, normalise, isValid, makeValid)
|
||||
|
||||
import Numeric.Natural as Import (Natural)
|
||||
|
||||
@ -60,7 +60,7 @@ dispatchHealthCheckMatchingClusterConfig
|
||||
dbSetting <- clusterSetting @'ClusterId
|
||||
return $ Just ourSetting == dbSetting
|
||||
clusterSettingMatches ClusterMemcachedKey = do
|
||||
ourSetting <- getsYesod $ fmap fst . appMemcached
|
||||
ourSetting <- getsYesod $ fmap memcachedKey . appMemcached
|
||||
dbSetting <- clusterSetting @'ClusterMemcachedKey
|
||||
return $ maybe True ((== dbSetting) . Just) ourSetting
|
||||
clusterSettingMatches ClusterVerpSecret = do
|
||||
|
||||
@ -2,23 +2,14 @@ module Jobs.Offload
|
||||
( mkJobOffloadHandler
|
||||
) where
|
||||
|
||||
import Import hiding (bracket, js)
|
||||
import Import hiding (js)
|
||||
import Jobs.Types
|
||||
import Jobs.Queue
|
||||
|
||||
import qualified Database.PostgreSQL.Simple as PG
|
||||
import qualified Database.PostgreSQL.Simple.Types as PG
|
||||
import qualified Database.PostgreSQL.Simple.Notification as PG
|
||||
|
||||
import Database.Persist.Postgresql (PostgresConf, pgConnStr)
|
||||
import Utils.Postgresql
|
||||
|
||||
import Data.Text.Encoding (decodeUtf8')
|
||||
|
||||
import UnliftIO.Exception (bracket)
|
||||
|
||||
|
||||
jobOffloadChannel :: Text
|
||||
jobOffloadChannel = "job-offload"
|
||||
|
||||
mkJobOffloadHandler :: forall m.
|
||||
( MonadResource m
|
||||
@ -32,39 +23,21 @@ mkJobOffloadHandler dbConf jMode
|
||||
| not shouldListen = Nothing
|
||||
| otherwise = Just $ do
|
||||
jobOffloadOutgoing <- newTVarIO mempty
|
||||
jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do
|
||||
myPid <- liftIO $ PG.getBackendPID pgConn
|
||||
|
||||
when shouldListen $
|
||||
void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel)
|
||||
|
||||
foreverBreak $ \(($ ()) -> terminate) -> do
|
||||
UniWorX{appJobState} <- ask
|
||||
shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||
when shouldTerminate terminate
|
||||
|
||||
let
|
||||
getInput = do
|
||||
n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn
|
||||
if | notificationPid == myPid || notificationChannel /= encodeUtf8 jobOffloadChannel -> getInput
|
||||
| otherwise -> return n
|
||||
getOutput = atomically $ do
|
||||
jQueue <- readTVar jobOffloadOutgoing
|
||||
case jQueue of
|
||||
j :< js -> j <$ writeTVar jobOffloadOutgoing js
|
||||
_other -> mzero
|
||||
|
||||
io <- lift $ if
|
||||
| shouldListen -> getInput `race` getOutput
|
||||
| otherwise -> Right <$> getOutput
|
||||
|
||||
case io of
|
||||
Left PG.Notification{..}
|
||||
| Just jId <- fromPathPiece =<< either (const Nothing) Just (decodeUtf8' notificationData)
|
||||
-> writeJobCtl $ JobCtlPerform jId
|
||||
| otherwise
|
||||
-> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow notificationData
|
||||
Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId)
|
||||
|
||||
jobOffloadHandler <- allocateAsync $ managePostgresqlChannel dbConf ChannelJobOffload PostgresqlChannelManager
|
||||
{ pgcTerminate = do
|
||||
UniWorX{appJobState} <- ask
|
||||
atomically $ do
|
||||
shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown
|
||||
guardOn shouldTerminate ()
|
||||
, pgcOnInput = Just $ \inpBS -> case fromPathPiece =<< either (const Nothing) Just (decodeUtf8' inpBS) of
|
||||
Nothing -> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow inpBS
|
||||
Just jId -> writeJobCtl $ JobCtlPerform jId
|
||||
, pgcGenOutput = atomically $ do
|
||||
jQueue <- readTVar jobOffloadOutgoing
|
||||
j <- case jQueue of
|
||||
j :< js -> j <$ writeTVar jobOffloadOutgoing js
|
||||
_other -> mzero
|
||||
return . encodeUtf8 $ toPathPiece j
|
||||
}
|
||||
return JobOffloadHandler{..}
|
||||
where shouldListen = has (_jobsAcceptOffload . only True) jMode
|
||||
|
||||
@ -93,7 +93,7 @@ data AuthTag -- sortiert nach gewünschter Reihenfolge auf /authpreds, d.h. Prä
|
||||
| AuthDevelopment
|
||||
| AuthFree
|
||||
deriving (Eq, Ord, Enum, Bounded, Read, Show, Data, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite, Hashable)
|
||||
deriving anyclass (Universe, Finite, Hashable, NFData)
|
||||
|
||||
nullaryPathPiece ''AuthTag $ camelToPathPiece' 1
|
||||
pathPieceJSON ''AuthTag
|
||||
@ -157,7 +157,7 @@ _ReducedActiveAuthTags = iso toReducedActiveAuthTags fromReducedActiveAuthTags
|
||||
|
||||
data PredLiteral a = PLVariable { plVar :: a } | PLNegated { plVar :: a }
|
||||
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
|
||||
deriving anyclass (Hashable, Binary)
|
||||
deriving anyclass (Hashable, Binary, NFData)
|
||||
|
||||
makeLenses_ ''PredLiteral
|
||||
makePrisms ''PredLiteral
|
||||
@ -178,7 +178,7 @@ instance PathPiece a => PathPiece (PredLiteral a) where
|
||||
|
||||
newtype PredDNF a = PredDNF { dnfTerms :: Set (NonNull (Set (PredLiteral a))) }
|
||||
deriving (Eq, Ord, Read, Show, Data, Generic, Typeable)
|
||||
deriving anyclass (Binary, Hashable)
|
||||
deriving anyclass (Binary, Hashable, NFData)
|
||||
|
||||
makeLenses_ ''PredDNF
|
||||
|
||||
|
||||
@ -205,6 +205,7 @@ data WorkflowRole userid
|
||||
| WorkflowRoleAuthorized { workflowRoleAuthorized :: AuthDNF }
|
||||
| WorkflowRoleInitiator
|
||||
deriving (Eq, Ord, Show, Read, Data, Generic, Typeable)
|
||||
deriving anyclass (NFData)
|
||||
|
||||
|
||||
----- WORKFLOW GRAPH: PAYLOAD SPECIFICATION -----
|
||||
@ -343,7 +344,7 @@ data WorkflowScope termid schoolid courseid
|
||||
| WSTermSchool { wisTerm :: termid, wisSchool :: schoolid }
|
||||
| WSCourse { wisCourse :: courseid }
|
||||
deriving (Eq, Ord, Show, Read, Data, Generic, Typeable)
|
||||
deriving anyclass (Hashable)
|
||||
deriving anyclass (Hashable, NFData)
|
||||
|
||||
data WorkflowScope'
|
||||
= WSGlobal' | WSTerm' | WSSchool' | WSTermSchool' | WSCourse'
|
||||
@ -363,6 +364,7 @@ classifyWorkflowScope = \case
|
||||
newtype WorkflowPayloadLabel = WorkflowPayloadLabel { unWorkflowPayloadLabel :: CI Text }
|
||||
deriving stock (Eq, Ord, Show, Read, Data, Generic, Typeable)
|
||||
deriving newtype (IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey, PathPiece, PersistField, Binary)
|
||||
deriving anyclass (NFData)
|
||||
|
||||
instance PersistFieldSql WorkflowPayloadLabel where
|
||||
sqlType _ = sqlType $ Proxy @(CI Text)
|
||||
|
||||
@ -186,6 +186,8 @@ data AppSettings = AppSettings
|
||||
, appCookieSettings :: RegisteredCookie -> CookieSettings
|
||||
|
||||
, appMemcachedConf :: Maybe MemcachedConf
|
||||
, appMemcacheAuth :: Bool
|
||||
, appMemcachedLocalConf :: Maybe (ARCConf Int)
|
||||
|
||||
, appUploadCacheConf :: Maybe Minio.ConnectInfo
|
||||
, appUploadCacheBucket, appUploadTmpBucket :: Minio.Bucket
|
||||
@ -215,8 +217,6 @@ data AppSettings = AppSettings
|
||||
, appStudyFeaturesRecacheRelevanceWithin :: Maybe NominalDiffTime
|
||||
, appStudyFeaturesRecacheRelevanceInterval :: NominalDiffTime
|
||||
|
||||
, appMemcacheAuth :: Bool
|
||||
|
||||
, appFileSourceARCConf :: Maybe (ARCConf Int)
|
||||
, appFileSourcePrewarmConf :: Maybe PrewarmCacheConf
|
||||
|
||||
@ -533,12 +533,15 @@ instance FromJSON AppSettings where
|
||||
]
|
||||
appWidgetMemcachedConf <- assertM validWidgetMemcachedConf <$> o .:? "widget-memcached"
|
||||
appSessionMemcachedConf <- assertM validMemcachedConf <$> o .:? "session-memcached"
|
||||
appMemcachedConf <- assertM validMemcachedConf <$> o .:? "memcached"
|
||||
appRoot <- o .:? "approot" .!= const Nothing
|
||||
appHost <- fromString <$> o .: "host"
|
||||
appPort <- o .: "port"
|
||||
appIpFromHeader <- o .: "ip-from-header"
|
||||
|
||||
appMemcachedConf <- assertM validMemcachedConf <$> o .:? "memcached"
|
||||
appMemcacheAuth <- o .:? "memcache-auth" .!= False
|
||||
appMemcachedLocalConf <- assertM isValidARCConf <$> o .:? "memcached-local"
|
||||
|
||||
appMailFrom <- o .: "mail-from"
|
||||
appMailEnvelopeFrom <- o .:? "mail-envelope-from" .!= addressEmail appMailFrom
|
||||
appMailSender <- o .:? "mail-sender" .!= appMailFrom
|
||||
@ -654,14 +657,11 @@ instance FromJSON AppSettings where
|
||||
|
||||
appDownloadTokenExpire <- o .: "download-token-expire"
|
||||
|
||||
appMemcacheAuth <- o .:? "memcache-auth" .!= False
|
||||
|
||||
appJobMode <- o .:? "job-mode" .!= JobsLocal True
|
||||
|
||||
appStudyFeaturesRecacheRelevanceWithin <- o .:? "study-features-recache-relevance-within"
|
||||
appStudyFeaturesRecacheRelevanceInterval <- o .: "study-features-recache-relevance-interval"
|
||||
|
||||
let isValidARCConf ARCConf{..} = arccMaximumWeight > 0
|
||||
appFileSourceARCConf <- assertM isValidARCConf <$> o .:? "file-source-arc"
|
||||
|
||||
let isValidPrewarmConf PrewarmCacheConf{..} = and
|
||||
@ -676,6 +676,7 @@ instance FromJSON AppSettings where
|
||||
appBotMitigations <- o .:? "bot-mitigations" .!= Set.empty
|
||||
|
||||
return AppSettings{..}
|
||||
where isValidARCConf ARCConf{..} = arccMaximumWeight > 0
|
||||
|
||||
makeClassy_ ''AppSettings
|
||||
|
||||
|
||||
117
src/Utils/ARC.hs
117
src/Utils/ARC.hs
@ -8,19 +8,44 @@ module Utils.ARC
|
||||
, arcRecentSize, arcFrequentSize, arcGhostRecentSize, arcGhostFrequentSize
|
||||
, getARCRecentWeight, getARCFrequentWeight
|
||||
, describeARC
|
||||
, NFDynamic(..), _NFDynamic, DynARC, DynARCHandle
|
||||
) where
|
||||
|
||||
import ClassyPrelude
|
||||
|
||||
import Data.OrdPSQ (OrdPSQ)
|
||||
import qualified Data.OrdPSQ as OrdPSQ
|
||||
import Data.HashPSQ (HashPSQ)
|
||||
import qualified Data.HashPSQ as HashPSQ
|
||||
|
||||
import Control.Lens
|
||||
|
||||
import Type.Reflection
|
||||
import Text.Show (showString)
|
||||
|
||||
import Data.Hashable (Hashed, hashed)
|
||||
|
||||
-- https://web.archive.org/web/20210115184012/https://dbs.uni-leipzig.de/file/ARC.pdf
|
||||
-- https://jaspervdj.be/posts/2015-02-24-lru-cache.html
|
||||
|
||||
|
||||
data NFDynamic where
|
||||
NFDynamic :: forall a. NFData a => TypeRep a -> a -> NFDynamic
|
||||
|
||||
_NFDynamic :: forall a. (Typeable a, NFData a) => Prism' NFDynamic a
|
||||
_NFDynamic = prism' toNFDyn fromNFDynamic
|
||||
where
|
||||
toNFDyn v = NFDynamic typeRep v
|
||||
fromNFDynamic (NFDynamic t v)
|
||||
| Just HRefl <- t `eqTypeRep` rep = Just v
|
||||
| otherwise = Nothing
|
||||
where rep = typeRep :: TypeRep a
|
||||
|
||||
instance NFData NFDynamic where
|
||||
rnf (NFDynamic t v) = rnfTypeRep t `seq` rnf v
|
||||
|
||||
instance Show NFDynamic where
|
||||
showsPrec _ (NFDynamic t _) = showString "<<" . showsPrec 0 t . showString ">>"
|
||||
|
||||
|
||||
newtype ARCTick = ARCTick { _getARCTick :: Word64 }
|
||||
deriving (Eq, Ord, Show, Typeable)
|
||||
deriving newtype (NFData)
|
||||
@ -28,13 +53,15 @@ newtype ARCTick = ARCTick { _getARCTick :: Word64 }
|
||||
makeLenses ''ARCTick
|
||||
|
||||
data ARC k w v = ARC
|
||||
{ arcRecent, arcFrequent :: !(OrdPSQ k ARCTick (v, w))
|
||||
, arcGhostRecent, arcGhostFrequent :: !(OrdPSQ k ARCTick ())
|
||||
{ arcRecent, arcFrequent :: !(HashPSQ (Hashed k) ARCTick (v, w))
|
||||
, arcGhostRecent, arcGhostFrequent :: !(HashPSQ (Hashed k) ARCTick ())
|
||||
, arcRecentWeight, arcFrequentWeight :: !w
|
||||
, arcTargetRecent, arcMaximumWeight :: !w
|
||||
, arcMaximumGhost :: !Int
|
||||
}
|
||||
|
||||
type DynARC k w = ARC (SomeTypeRep, k) w NFDynamic
|
||||
|
||||
instance (NFData k, NFData w, NFData v) => NFData (ARC k w v) where
|
||||
rnf ARC{..} = rnf arcRecent
|
||||
`seq` rnf arcFrequent
|
||||
@ -50,10 +77,10 @@ describeARC :: Show w
|
||||
=> ARC k w v
|
||||
-> String
|
||||
describeARC ARC{..} = intercalate ", "
|
||||
[ "arcRecent: " <> show (OrdPSQ.size arcRecent)
|
||||
, "arcFrequent: " <> show (OrdPSQ.size arcFrequent)
|
||||
, "arcGhostRecent: " <> show (OrdPSQ.size arcGhostRecent)
|
||||
, "arcGhostFrequent: " <> show (OrdPSQ.size arcGhostFrequent)
|
||||
[ "arcRecent: " <> show (HashPSQ.size arcRecent)
|
||||
, "arcFrequent: " <> show (HashPSQ.size arcFrequent)
|
||||
, "arcGhostRecent: " <> show (HashPSQ.size arcGhostRecent)
|
||||
, "arcGhostFrequent: " <> show (HashPSQ.size arcGhostFrequent)
|
||||
, "arcRecentWeight: " <> show arcRecentWeight
|
||||
, "arcFrequentWeight: " <> show arcFrequentWeight
|
||||
, "arcTargetRecent: " <> show arcTargetRecent
|
||||
@ -62,10 +89,10 @@ describeARC ARC{..} = intercalate ", "
|
||||
]
|
||||
|
||||
arcRecentSize, arcFrequentSize, arcGhostRecentSize, arcGhostFrequentSize :: ARC k w v -> Int
|
||||
arcRecentSize = OrdPSQ.size . arcRecent
|
||||
arcFrequentSize = OrdPSQ.size . arcFrequent
|
||||
arcGhostRecentSize = OrdPSQ.size . arcGhostRecent
|
||||
arcGhostFrequentSize = OrdPSQ.size . arcGhostFrequent
|
||||
arcRecentSize = HashPSQ.size . arcRecent
|
||||
arcFrequentSize = HashPSQ.size . arcFrequent
|
||||
arcGhostRecentSize = HashPSQ.size . arcGhostRecent
|
||||
arcGhostFrequentSize = HashPSQ.size . arcGhostFrequent
|
||||
|
||||
getARCRecentWeight, getARCFrequentWeight :: ARC k w v -> w
|
||||
getARCRecentWeight = arcRecentWeight
|
||||
@ -83,10 +110,10 @@ initARC arcMaximumGhost arcMaximumWeight
|
||||
| arcMaximumWeight < 0 = error "initARC given negative maximum weight"
|
||||
| arcMaximumGhost < 0 = error "initARC given negative maximum ghost size"
|
||||
| otherwise = (, initialARCTick) ARC
|
||||
{ arcRecent = OrdPSQ.empty
|
||||
, arcFrequent = OrdPSQ.empty
|
||||
, arcGhostRecent = OrdPSQ.empty
|
||||
, arcGhostFrequent = OrdPSQ.empty
|
||||
{ arcRecent = HashPSQ.empty
|
||||
, arcFrequent = HashPSQ.empty
|
||||
, arcGhostRecent = HashPSQ.empty
|
||||
, arcGhostFrequent = HashPSQ.empty
|
||||
, arcRecentWeight = 0
|
||||
, arcFrequentWeight = 0
|
||||
, arcMaximumWeight
|
||||
@ -103,7 +130,7 @@ infixl 6 |-
|
||||
|
||||
|
||||
arcAlterF :: forall f k w v.
|
||||
( Ord k
|
||||
( Ord k, Hashable k
|
||||
, Functor f
|
||||
, Integral w
|
||||
)
|
||||
@ -112,57 +139,57 @@ arcAlterF :: forall f k w v.
|
||||
-> ARC k w v
|
||||
-> ARCTick -> f (ARC k w v, ARCTick)
|
||||
-- | Unchecked precondition: item weights are always less than `arcMaximumWeight`
|
||||
arcAlterF k f oldARC@ARC{..} now
|
||||
| later <= initialARCTick = uncurry (arcAlterF k f) $ initARC arcMaximumGhost arcMaximumWeight
|
||||
arcAlterF unhashedK@(hashed -> k) f oldARC@ARC{..} now
|
||||
| later <= initialARCTick = uncurry (arcAlterF unhashedK f) $ initARC arcMaximumGhost arcMaximumWeight
|
||||
| otherwise = (, later) <$> if
|
||||
| Just (_p, x@(_, w), arcFrequent') <- OrdPSQ.deleteView k arcFrequent
|
||||
| Just (_p, x@(_, w), arcFrequent') <- HashPSQ.deleteView k arcFrequent
|
||||
-> f (Just x) <&> \(fromMaybe x -> x'@(_, w'))
|
||||
-> let (arcFrequent'', arcFrequentWeight'', arcGhostFrequent') = evictToSize (arcMaximumWeight |- arcTargetRecent |- w') arcFrequent' (arcFrequentWeight - w) arcGhostFrequent
|
||||
in oldARC
|
||||
{ arcFrequent = OrdPSQ.insert k now x' arcFrequent''
|
||||
{ arcFrequent = HashPSQ.insert k now x' arcFrequent''
|
||||
, arcFrequentWeight = arcFrequentWeight'' + w'
|
||||
, arcGhostFrequent = arcGhostFrequent'
|
||||
}
|
||||
| Just (_p, x@(_, w), arcRecent') <- OrdPSQ.deleteView k arcRecent
|
||||
| Just (_p, x@(_, w), arcRecent') <- HashPSQ.deleteView k arcRecent
|
||||
-> f (Just x) <&> \(fromMaybe x -> x'@(_, w'))
|
||||
-> let (arcFrequent', arcFrequentWeight', arcGhostFrequent') = evictToSize (arcMaximumWeight |- arcTargetRecent |- w') arcFrequent arcFrequentWeight arcGhostFrequent
|
||||
in oldARC
|
||||
{ arcRecent = arcRecent'
|
||||
, arcRecentWeight = arcRecentWeight - w
|
||||
, arcFrequent = OrdPSQ.insert k now x' arcFrequent'
|
||||
, arcFrequent = HashPSQ.insert k now x' arcFrequent'
|
||||
, arcFrequentWeight = arcFrequentWeight' + w'
|
||||
, arcGhostFrequent = arcGhostFrequent'
|
||||
}
|
||||
| Just (_p, (), arcGhostRecent') <- OrdPSQ.deleteView k arcGhostRecent
|
||||
| Just (_p, (), arcGhostRecent') <- HashPSQ.deleteView k arcGhostRecent
|
||||
-> f Nothing <&> \case
|
||||
Nothing -> oldARC
|
||||
{ arcGhostRecent = OrdPSQ.insert k now () arcGhostRecent'
|
||||
{ arcGhostRecent = HashPSQ.insert k now () arcGhostRecent'
|
||||
}
|
||||
Just x@(_, w)
|
||||
-> let arcTargetRecent' = min arcMaximumWeight $ arcTargetRecent + max avgWeight (round $ toRational (OrdPSQ.size arcGhostFrequent) / toRational (OrdPSQ.size arcGhostRecent) * toRational avgWeight)
|
||||
-> let arcTargetRecent' = min arcMaximumWeight $ arcTargetRecent + max avgWeight (round $ toRational (HashPSQ.size arcGhostFrequent) / toRational (HashPSQ.size arcGhostRecent) * toRational avgWeight)
|
||||
(arcFrequent', arcFrequentWeight', arcGhostFrequent') = evictToSize (arcMaximumWeight |- arcTargetRecent' |- w) arcFrequent arcFrequentWeight arcGhostFrequent
|
||||
(arcRecent', arcRecentWeight', arcGhostRecent'') = evictToSize (max arcTargetRecent' $ arcMaximumWeight |- arcFrequentWeight' |- w) arcRecent arcRecentWeight arcGhostRecent'
|
||||
in oldARC
|
||||
{ arcRecent = arcRecent'
|
||||
, arcFrequent = OrdPSQ.insert k now x arcFrequent'
|
||||
, arcFrequent = HashPSQ.insert k now x arcFrequent'
|
||||
, arcGhostRecent = arcGhostRecent''
|
||||
, arcGhostFrequent = arcGhostFrequent'
|
||||
, arcRecentWeight = arcRecentWeight'
|
||||
, arcFrequentWeight = arcFrequentWeight' + w
|
||||
, arcTargetRecent = arcTargetRecent'
|
||||
}
|
||||
| Just (_p, (), arcGhostFrequent') <- OrdPSQ.deleteView k arcGhostFrequent
|
||||
| Just (_p, (), arcGhostFrequent') <- HashPSQ.deleteView k arcGhostFrequent
|
||||
-> f Nothing <&> \case
|
||||
Nothing -> oldARC
|
||||
{ arcGhostFrequent = OrdPSQ.insert k now () arcGhostFrequent'
|
||||
{ arcGhostFrequent = HashPSQ.insert k now () arcGhostFrequent'
|
||||
}
|
||||
Just x@(_, w)
|
||||
-> let arcTargetRecent' = arcTargetRecent |- max avgWeight (round $ toRational (OrdPSQ.size arcGhostRecent) / toRational (OrdPSQ.size arcGhostFrequent) * toRational avgWeight)
|
||||
-> let arcTargetRecent' = arcTargetRecent |- max avgWeight (round $ toRational (HashPSQ.size arcGhostRecent) / toRational (HashPSQ.size arcGhostFrequent) * toRational avgWeight)
|
||||
(arcFrequent', arcFrequentWeight', arcGhostFrequent'') = evictToSize (arcMaximumWeight |- arcTargetRecent' |- w) arcFrequent arcFrequentWeight arcGhostFrequent'
|
||||
(arcRecent', arcRecentWeight', arcGhostRecent') = evictToSize (max arcTargetRecent' $ arcMaximumWeight |- arcFrequentWeight' |- w) arcRecent arcRecentWeight arcGhostRecent
|
||||
in oldARC
|
||||
{ arcRecent = arcRecent'
|
||||
, arcFrequent = OrdPSQ.insert k now x arcFrequent'
|
||||
, arcFrequent = HashPSQ.insert k now x arcFrequent'
|
||||
, arcGhostRecent = arcGhostRecent'
|
||||
, arcGhostFrequent = arcGhostFrequent''
|
||||
, arcRecentWeight = arcRecentWeight'
|
||||
@ -171,35 +198,35 @@ arcAlterF k f oldARC@ARC{..} now
|
||||
}
|
||||
| otherwise -> f Nothing <&> \case
|
||||
Nothing -> oldARC
|
||||
{ arcGhostRecent = OrdPSQ.insert k now () $ evictGhostToCount arcGhostRecent
|
||||
{ arcGhostRecent = HashPSQ.insert k now () $ evictGhostToCount arcGhostRecent
|
||||
}
|
||||
Just x@(_, w)
|
||||
-> let (arcRecent', arcRecentWeight', arcGhostRecent') = evictToSize (max arcTargetRecent (arcMaximumWeight |- arcFrequentWeight) |- w) arcRecent arcRecentWeight arcGhostRecent
|
||||
in oldARC
|
||||
{ arcRecent = OrdPSQ.insert k now x arcRecent'
|
||||
{ arcRecent = HashPSQ.insert k now x arcRecent'
|
||||
, arcRecentWeight = arcRecentWeight' + w
|
||||
, arcGhostRecent = arcGhostRecent'
|
||||
}
|
||||
where
|
||||
avgWeight = round $ toRational (arcRecentWeight + arcFrequentWeight) / toRational (OrdPSQ.size arcFrequent + OrdPSQ.size arcRecent)
|
||||
avgWeight = round $ toRational (arcRecentWeight + arcFrequentWeight) / toRational (HashPSQ.size arcFrequent + HashPSQ.size arcRecent)
|
||||
|
||||
later :: ARCTick
|
||||
later = over getARCTick succ now
|
||||
|
||||
evictToSize :: w -> OrdPSQ k ARCTick (v, w) -> w -> OrdPSQ k ARCTick () -> (OrdPSQ k ARCTick (v, w), w, OrdPSQ k ARCTick ())
|
||||
evictToSize :: w -> HashPSQ (Hashed k) ARCTick (v, w) -> w -> HashPSQ (Hashed k) ARCTick () -> (HashPSQ (Hashed k) ARCTick (v, w), w, HashPSQ (Hashed k) ARCTick ())
|
||||
evictToSize tSize c cSize ghostC
|
||||
| cSize <= tSize = (c, cSize, ghostC)
|
||||
| Just (k', p', (_, w'), c') <- OrdPSQ.minView c = evictToSize tSize c' (cSize - w') . evictGhostToCount $ OrdPSQ.insert k' p' () ghostC
|
||||
| Just (k', p', (_, w'), c') <- HashPSQ.minView c = evictToSize tSize c' (cSize - w') . evictGhostToCount $ HashPSQ.insert k' p' () ghostC
|
||||
| otherwise = error "evictToSize: cannot reach required size through eviction"
|
||||
|
||||
evictGhostToCount :: OrdPSQ k ARCTick () -> OrdPSQ k ARCTick ()
|
||||
evictGhostToCount :: HashPSQ (Hashed k) ARCTick () -> HashPSQ (Hashed k) ARCTick ()
|
||||
evictGhostToCount c
|
||||
| OrdPSQ.size c <= arcMaximumGhost = c
|
||||
| Just (_, _, _, c') <- OrdPSQ.minView c = evictGhostToCount c'
|
||||
| HashPSQ.size c <= arcMaximumGhost = c
|
||||
| Just (_, _, _, c') <- HashPSQ.minView c = evictGhostToCount c'
|
||||
| otherwise = error "evictGhostToCount: cannot reach required count through eviction"
|
||||
|
||||
lookupARC :: forall k w v.
|
||||
( Ord k
|
||||
( Ord k, Hashable k
|
||||
, Integral w
|
||||
)
|
||||
=> k
|
||||
@ -208,7 +235,7 @@ lookupARC :: forall k w v.
|
||||
lookupARC k = getConst . uncurry (arcAlterF k Const)
|
||||
|
||||
insertARC :: forall k w v.
|
||||
( Ord k
|
||||
( Ord k, Hashable k
|
||||
, Integral w
|
||||
)
|
||||
=> k
|
||||
@ -221,6 +248,8 @@ insertARC k newVal = (runIdentity .) . arcAlterF k (const $ pure newVal)
|
||||
newtype ARCHandle k w v = ARCHandle { _getARCHandle :: IORef (ARC k w v, ARCTick) }
|
||||
deriving (Eq, Typeable)
|
||||
|
||||
type DynARCHandle k w = ARCHandle (SomeTypeRep, k) w NFDynamic
|
||||
|
||||
initARCHandle :: forall k w v m.
|
||||
( MonadIO m
|
||||
, Integral w
|
||||
@ -232,7 +261,7 @@ initARCHandle maxGhost maxWeight = fmap ARCHandle . newIORef $ initARC maxGhost
|
||||
|
||||
cachedARC' :: forall k w v m.
|
||||
( MonadIO m
|
||||
, Ord k
|
||||
, Ord k, Hashable k
|
||||
, Integral w
|
||||
, NFData k, NFData w, NFData v
|
||||
)
|
||||
@ -261,7 +290,7 @@ cachedARC' (ARCHandle arcVar) k f = do
|
||||
|
||||
cachedARC :: forall k w v m.
|
||||
( MonadIO m
|
||||
, Ord k
|
||||
, Ord k, Hashable k
|
||||
, Integral w
|
||||
, NFData k, NFData w, NFData v
|
||||
)
|
||||
@ -273,7 +302,7 @@ cachedARC h k f = fromMaybe (error "cachedARC: cachedARC' returned Nothing") <$>
|
||||
|
||||
lookupARCHandle :: forall k w v m.
|
||||
( MonadIO m
|
||||
, Ord k
|
||||
, Ord k, Hashable k
|
||||
, Integral w
|
||||
)
|
||||
=> ARCHandle k w v
|
||||
|
||||
@ -57,7 +57,7 @@ data I18n a = I18n
|
||||
, i18nFallbackLang :: Maybe Lang
|
||||
, i18nTranslations :: Map Lang a
|
||||
} deriving (Eq, Ord, Read, Show, Functor, Foldable, Traversable, Data, Generic, Typeable)
|
||||
deriving anyclass (MonoFunctor, MonoFoldable, MonoTraversable, Binary)
|
||||
deriving anyclass (MonoFunctor, MonoFoldable, MonoTraversable, Binary, NFData)
|
||||
type instance Element (I18n a) = a
|
||||
|
||||
type I18nText = I18n Text
|
||||
|
||||
@ -272,7 +272,7 @@ relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lb
|
||||
|
||||
data ARCMetrics = ARCMetrics
|
||||
|
||||
data ARCLabel = ARCFileSource
|
||||
data ARCLabel = ARCFileSource | ARCMemcachedLocal
|
||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite)
|
||||
|
||||
|
||||
67
src/Utils/Postgresql.hs
Normal file
67
src/Utils/Postgresql.hs
Normal file
@ -0,0 +1,67 @@
|
||||
module Utils.Postgresql
|
||||
( PostgresqlChannel(..)
|
||||
, PostgresqlChannelManager(..)
|
||||
, managePostgresqlChannel
|
||||
, PostgresConf
|
||||
) where
|
||||
|
||||
import Import.NoFoundation hiding (bracket)
|
||||
|
||||
import qualified Database.PostgreSQL.Simple as PG
|
||||
import qualified Database.PostgreSQL.Simple.Types as PG
|
||||
import qualified Database.PostgreSQL.Simple.Notification as PG
|
||||
|
||||
import Database.Persist.Postgresql (PostgresConf, pgConnStr)
|
||||
|
||||
import UnliftIO.Exception (bracket)
|
||||
|
||||
|
||||
data PostgresqlChannel
|
||||
= ChannelJobOffload
|
||||
| ChannelMemcachedLocalInvalidation
|
||||
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||
deriving anyclass (Universe, Finite)
|
||||
|
||||
nullaryPathPiece ''PostgresqlChannel $ camelToPathPiece' 1
|
||||
|
||||
|
||||
data PostgresqlChannelManager m a = PostgresqlChannelManager
|
||||
{ pgcTerminate :: m a -- ^ Expected to block; used within `race`
|
||||
, pgcOnInput :: Maybe (ByteString -> m ())
|
||||
, pgcGenOutput :: m ByteString -- ^ Expected to block; used within `race`
|
||||
}
|
||||
|
||||
managePostgresqlChannel :: forall m a.
|
||||
( MonadUnliftIO m
|
||||
, MonadLogger m
|
||||
)
|
||||
=> PostgresConf
|
||||
-> PostgresqlChannel
|
||||
-> PostgresqlChannelManager m a
|
||||
-> m a
|
||||
managePostgresqlChannel dbConf (toPathPiece -> chan) PostgresqlChannelManager{..} = bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do
|
||||
myPid <- liftIO $ PG.getBackendPID pgConn
|
||||
when (is _Just pgcOnInput) $
|
||||
void . liftIO . PG.execute pgConn "LISTEN ?" . PG.Only $ PG.Identifier chan
|
||||
|
||||
let
|
||||
getInput = do
|
||||
n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn
|
||||
if | notificationPid == myPid || notificationChannel /= encodeUtf8 chan -> getInput
|
||||
| otherwise -> return n
|
||||
|
||||
foreverBreak $ \terminate -> do
|
||||
io <- lift . (pgcTerminate `race`) $ if
|
||||
| is _Just pgcOnInput -> getInput `race` pgcGenOutput
|
||||
| otherwise -> Right <$> pgcGenOutput
|
||||
|
||||
case io of
|
||||
Right (Left notif@PG.Notification{..}) -> do
|
||||
$logDebugS "PGChannel" $ "Got input: " <> tshow notif
|
||||
lift $ maybe (return ()) ($ notificationData) pgcOnInput
|
||||
Right (Right o) -> do
|
||||
void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier chan, o)
|
||||
$logDebugS "PGChannel" $ "Sent output: " <> tshow o
|
||||
Left t -> do
|
||||
$logDebugS "PGChannel" "Terminating..."
|
||||
terminate t
|
||||
@ -81,3 +81,4 @@ instance site ~ site' => ToWidget site (SomeMessage site') where
|
||||
|
||||
deriving instance Generic AuthResult
|
||||
instance Binary AuthResult
|
||||
instance NFData AuthResult
|
||||
|
||||
Loading…
Reference in New Issue
Block a user