diff --git a/config/settings.yml b/config/settings.yml index ea6d0dd97..024a72945 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -157,6 +157,10 @@ memcached: limit: "_env:MEMCACHED_LIMIT:1024" timeout: "_env:MEMCACHED_TIMEOUT:20" expiration: "_env:MEMCACHED_EXPIRATION:300" +memcache-auth: true +memcached-local: + maximum-ghost: 512 + maximum-weight: 104857600 # 100MiB upload-cache: host: "_env:UPLOAD_S3_HOST:" @@ -269,8 +273,6 @@ fallback-personalised-sheet-files-keys-expire: 2419200 download-token-expire: 604801 -memcache-auth: true - file-source-arc: maximum-ghost: 512 maximum-weight: 1073741824 # 1GiB diff --git a/src/Application.hs b/src/Application.hs index a14c41403..bcaf1edda 100644 --- a/src/Application.hs +++ b/src/Application.hs @@ -112,6 +112,9 @@ import qualified Data.IntervalMap.Strict as IntervalMap import qualified Utils.Pool as Custom +import Utils.Postgresql +import Handler.Utils.Memcached (manageMemcachedLocalInvalidations) + -- Import all relevant handler modules here. -- (HPack takes care to add new modules to our cabal file nowadays.) import Handler.News @@ -208,7 +211,7 @@ makeFoundation appSettings''@AppSettings{..} = do -- from there, and then create the real foundation. let mkFoundation :: _ -> (forall m. MonadIO m => Custom.Pool' m DBConnLabel DBConnUseState SqlBackend) -> _ - mkFoundation appSettings' appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey = UniWorX {..} + mkFoundation appSettings' appConnPool appSmtpPool appLdapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appMemcachedLocal appUploadCache appVerpSecret appAuthKey = UniWorX {..} -- The UniWorX {..} syntax is an example of record wild cards. For more -- information, see: -- https://ocharles.org.uk/blog/posts/2014-12-04-record-wildcards.html @@ -224,6 +227,7 @@ makeFoundation appSettings''@AppSettings{..} = do (error "JSONWebKeySet forced in tempFoundation") (error "ClusterID forced in tempFoundation") (error "memcached forced in tempFoundation") + (error "memcachedLocal forced in tempFoundation") (error "MinioConn forced in tempFoundation") (error "VerpSecret forced in tempFoundation") (error "AuthKey forced in tempFoundation") @@ -295,11 +299,17 @@ makeFoundation appSettings''@AppSettings{..} = do appMemcached <- for appMemcachedConf $ \memcachedConf -> do $logDebugS "setup" "Memcached" memcachedKey <- clusterSetting (Proxy :: Proxy 'ClusterMemcachedKey) `customRunSqlPool` sqlPool - memcached <- createMemcached memcachedConf + memcachedConn <- createMemcached memcachedConf when appClearCache $ do $logWarnS "setup" "Clearing memcached" - liftIO $ Memcached.flushAll memcached - return (memcachedKey, memcached) + liftIO $ Memcached.flushAll memcachedConn + return AppMemcached{..} + appMemcachedLocal <- for appMemcachedLocalConf $ \ARCConf{..} -> do + memcachedLocalARC <- initARCHandle arccMaximumGhost arccMaximumWeight + void . Prometheus.register $ arcMetrics ARCMemcachedLocal memcachedLocalARC + memcachedLocalInvalidationQueue <- newTVarIO mempty + memcachedLocalHandleInvalidations <- allocateLinkedAsync . managePostgresqlChannel appDatabaseConf ChannelMemcachedLocalInvalidation $ manageMemcachedLocalInvalidations memcachedLocalARC memcachedLocalInvalidationQueue + return AppMemcachedLocal{..} appSessionStore <- mkSessionStore appSettings'' sqlPool `customRunSqlPool` sqlPool @@ -314,7 +324,7 @@ makeFoundation appSettings''@AppSettings{..} = do $logDebugS "Runtime configuration" $ tshow appSettings' - let foundation = mkFoundation appSettings' sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appUploadCache appVerpSecret appAuthKey + let foundation = mkFoundation appSettings' sqlPool smtpPool ldapPool appCryptoIDKey appSessionStore appSecretBoxKey appWidgetMemcached appJSONWebKeySet appClusterID appMemcached appMemcachedLocal appUploadCache appVerpSecret appAuthKey -- Return the foundation $logDebugS "setup" "Done" @@ -671,7 +681,7 @@ shutdownApp app = do for_ (appSmtpPool app) destroyAllResources for_ (appLdapPool app) . mapFailover $ views _2 destroyAllResources for_ (appWidgetMemcached app) Memcached.close - for_ (appMemcached app) $ views _2 Memcached.close + for_ (appMemcached app) $ views _memcachedConn Memcached.close release . fst $ appLogger app liftIO $ threadDelay 1e6 diff --git a/src/Database/Persist/Sql/Types/Instances.hs b/src/Database/Persist/Sql/Types/Instances.hs index cc7219bc3..4fd50d57a 100644 --- a/src/Database/Persist/Sql/Types/Instances.hs +++ b/src/Database/Persist/Sql/Types/Instances.hs @@ -24,3 +24,4 @@ instance BackendCompatible SqlWriteBackend SqlBackend where projectBackend = SqlWriteBackend deriving newtype instance Binary (BackendKey SqlBackend) +deriving anyclass instance NFData (BackendKey SqlBackend) diff --git a/src/Foundation/Authorization.hs b/src/Foundation/Authorization.hs index a318ca559..cd0c34b01 100644 --- a/src/Foundation/Authorization.hs +++ b/src/Foundation/Authorization.hs @@ -157,7 +157,7 @@ instance (MonadHandler m, HandlerSite m ~ UniWorX, BackendCompatible SqlReadBack -- cacheAP mExp k mkV cont = APBind $ \mAuthId route isWrite -> either (return . Left) (fmap Right) . cont mAuthId route isWrite =<< memcachedBy mExp k mkV cacheAPDB :: ( Binary k - , Typeable v, Binary v + , Typeable v, Binary v, NFData v ) => Maybe Expiry -> k @@ -185,7 +185,7 @@ cacheAPDB mExp k mkV cont = APBindDB $ \mAuthId route isWrite -> do -- Nothing -> either (return . Left) (fmap Right) $ cont mAuthId route isWrite Nothing cacheAPDB' :: ( Binary k - , Typeable v, Binary v + , Typeable v, Binary v, NFData v ) => Maybe Expiry -> (Maybe (AuthId UniWorX) -> Route UniWorX -> Bool -> Maybe (k, ReaderT SqlReadBackend (HandlerFor UniWorX) v)) diff --git a/src/Foundation/Type.hs b/src/Foundation/Type.hs index 6f9fb8091..3b7494d3c 100644 --- a/src/Foundation/Type.hs +++ b/src/Foundation/Type.hs @@ -5,6 +5,10 @@ module Foundation.Type ( UniWorX(..) , SomeSessionStorage(..) , _SessionStorageMemcachedSql, _SessionStorageAcid + , AppMemcached(..) + , _memcachedKey, _memcachedConn + , AppMemcachedLocal(..) + , _memcachedLocalARC , SMTPPool , _appSettings', _appStatic, _appConnPool, _appSmtpPool, _appLdapPool, _appWidgetMemcached, _appHttpManager, _appLogger, _appLogSettings, _appCryptoIDKey, _appClusterID, _appInstanceID, _appJobState, _appSessionStore, _appSecretBoxKey, _appJSONWebKeySet, _appHealthReport, _appMemcached, _appUploadCache, _appVerpSecret, _appAuthKey , DB, Form, MsgRenderer, MailM, DBFile @@ -30,15 +34,35 @@ import qualified Utils.Pool as Custom import Utils.Metrics (DBConnUseState) +import qualified Data.ByteString.Lazy as Lazy +import Data.Time.Clock.POSIX (POSIXTime) +import GHC.Fingerprint (Fingerprint) + type SMTPPool = Pool SMTPConnection data SomeSessionStorage = SessionStorageMemcachedSql { sessionStorageMemcachedSql :: MemcachedSqlStorage SessionMap } | SessionStorageAcid { sessionStorageAcid :: AcidStorage SessionMap } + deriving (Generic, Typeable) makePrisms ''SomeSessionStorage +data AppMemcached = AppMemcached + { memcachedKey :: AEAD.Key + , memcachedConn :: Memcached.Connection + } deriving (Generic, Typeable) + +makeLenses_ ''AppMemcached + +data AppMemcachedLocal = AppMemcachedLocal + { memcachedLocalARC :: ARCHandle (Fingerprint, Lazy.ByteString) Int (NFDynamic, Maybe POSIXTime) + , memcachedLocalHandleInvalidations :: Async () + , memcachedLocalInvalidationQueue :: TVar (Seq (Fingerprint, Lazy.ByteString)) + } deriving (Generic, Typeable) + +makeLenses_ ''AppMemcachedLocal + -- | The foundation datatype for your application. This can be a good place to -- keep settings and values requiring initialization before your application -- starts running, such as database connections. Every handler will have @@ -61,14 +85,15 @@ data UniWorX = UniWorX , appSecretBoxKey :: SecretBox.Key , appJSONWebKeySet :: Jose.JwkSet , appHealthReport :: TVar (Set (UTCTime, HealthReport)) - , appMemcached :: Maybe (AEAD.Key, Memcached.Connection) + , appMemcached :: Maybe AppMemcached + , appMemcachedLocal :: Maybe AppMemcachedLocal , appUploadCache :: Maybe MinioConn , appVerpSecret :: VerpSecret , appAuthKey :: Auth.Key , appFileSourceARC :: Maybe (ARCHandle (FileContentChunkReference, (Int, Int)) Int ByteString) , appFileSourcePrewarm :: Maybe (LRUHandle (FileContentChunkReference, (Int, Int)) UTCTime Int ByteString) , appFileInjectInhibit :: TVar (IntervalMap UTCTime (Set FileContentReference)) - } + } deriving (Typeable) makeLenses_ ''UniWorX instance HasInstanceID UniWorX InstanceId where diff --git a/src/Handler/Utils.hs b/src/Handler/Utils.hs index 12ee98805..76b3496f1 100644 --- a/src/Handler/Utils.hs +++ b/src/Handler/Utils.hs @@ -22,7 +22,7 @@ import Handler.Utils.I18n as Handler.Utils import Handler.Utils.Widgets as Handler.Utils import Handler.Utils.Database as Handler.Utils import Handler.Utils.Occurrences as Handler.Utils -import Handler.Utils.Memcached as Handler.Utils +import Handler.Utils.Memcached as Handler.Utils hiding (manageMemcachedLocalInvalidations) import Handler.Utils.Files as Handler.Utils import Handler.Utils.Download as Handler.Utils diff --git a/src/Handler/Utils/Memcached.hs b/src/Handler/Utils/Memcached.hs index 58a081b36..de06f6103 100644 --- a/src/Handler/Utils/Memcached.hs +++ b/src/Handler/Utils/Memcached.hs @@ -1,9 +1,12 @@ +{-# LANGUAGE DuplicateRecordFields #-} + module Handler.Utils.Memcached ( memcachedAvailable , memcached, memcachedBy , memcachedHere, memcachedByHere , memcachedSet, memcachedGet , memcachedInvalidate, memcachedByInvalidate + , manageMemcachedLocalInvalidations , memcachedByGet, memcachedBySet , memcachedTimeout, memcachedTimeoutBy , memcachedTimeoutHere, memcachedTimeoutByHere @@ -35,9 +38,11 @@ import Crypto.Hash.Algorithms (SHAKE256) import qualified Data.ByteArray as BA +import qualified Data.ByteString.Base64 as Base64 + import Language.Haskell.TH hiding (Type) -import Data.Typeable (typeRep) +import Data.Typeable (typeRep, typeRepFingerprint) import Type.Reflection (typeOf, TypeRep) import qualified Type.Reflection as Refl (typeRep) import Data.Type.Equality (TestEquality(..)) @@ -58,6 +63,12 @@ import qualified Control.Monad.State.Class as State import qualified Data.ByteString.Lazy as Lazy (ByteString) +import GHC.Fingerprint + +import Utils.Postgresql + +import UnliftIO.Concurrent (threadDelay) + type Expiry = Either UTCTime DiffTime @@ -112,7 +123,7 @@ putMemcachedValue MemcachedValue{..} = do putExpiry mExpiry Binary.putByteString mCiphertext -getMemcachedValue :: Binary.Get MemcachedValue +getMemcachedValue, getMemcachedValueNoExpiry :: Binary.Get MemcachedValue getMemcachedValue = do Binary.lookAhead . Binary.label "length check" $ do void . Binary.getByteString $ Saltine.secretBoxNonce + 4 + Saltine.secretBoxMac @@ -120,8 +131,6 @@ getMemcachedValue = do mExpiry <- getExpiry mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString return MemcachedValue{..} - -getMemcachedValueNoExpiry :: Binary.Get MemcachedValue getMemcachedValueNoExpiry = do Binary.lookAhead . Binary.label "length check" $ do void . Binary.getByteString $ Saltine.secretBoxNonce + 4 + Saltine.secretBoxMac @@ -130,7 +139,6 @@ getMemcachedValueNoExpiry = do mCiphertext <- Binary.label "ciphertext" $ toStrict <$> Binary.getRemainingLazyByteString return MemcachedValue{..} - memcachedAvailable :: ( MonadHandler m, HandlerSite m ~ UniWorX ) => m Bool @@ -143,9 +151,9 @@ data MemcachedException = MemcachedException Memcached.MemcachedException deriving anyclass (Exception) -memcachedKey :: Typeable a +toMemcachedKey :: Typeable a => AEAD.Key -> Proxy a -> Lazy.ByteString -> ByteString -memcachedKey (Saltine.encode -> kmacKey) p = BA.convert . kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey +toMemcachedKey (Saltine.encode -> kmacKey) p = BA.convert . kmaclazy @(SHAKE256 256) (encodeUtf8 . tshow $ typeRep p) kmacKey memcachedAAD :: ByteString -> Maybe POSIXTime -> ByteString memcachedAAD cKey mExpiry = toStrict . Binary.runPut $ do @@ -154,18 +162,30 @@ memcachedAAD cKey mExpiry = toStrict . Binary.runPut $ do memcachedByGet :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => k -> m (Maybe a) -memcachedByGet (Binary.encode -> k) = runMaybeT $ requestCache <|> memcache +memcachedByGet (Binary.encode -> k) = runMaybeT $ arc <|> memcache where - requestCache = MaybeT . cacheByGet $ toStrict k + arc = do + AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal + res <- hoistMaybe . preview (_1 . _NFDynamic) <=< hoistMaybe <=< cachedARC' memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) $ \mPrev -> do + prev@((_, prevExpiry), _) <- hoistMaybe mPrev + $logDebugS "memcached" "Cache hit (local ARC)" + lift . runMaybeT $ do -- To delete from ARC upon expiry + for_ prevExpiry $ \expiry -> do + now <- liftIO getPOSIXTime + guard $ expiry > now + return prev + $logDebugS "memcached" "All valid (local ARC)" + return res memcache = do - (aeadKey, conn) <- MaybeT $ getsYesod appMemcached - let cKey = memcachedKey aeadKey (Proxy @a) k + AppMemcached{..} <- MaybeT $ getsYesod appMemcached + localARC <- getsYesod appMemcachedLocal + let cKey = toMemcachedKey memcachedKey (Proxy @a) k - encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey conn + encVal <- fmap toStrict . hoist liftIO . catchMaybeT (Proxy @Memcached.MemcachedException) $ Memcached.get_ cKey memcachedConn $logDebugS "memcached" "Cache hit" @@ -177,11 +197,20 @@ memcachedByGet (Binary.encode -> k) = runMaybeT $ requestCache <|> memcache guard $ expiry > now + clockLeniency $logDebugS "memcached" $ "Expiry valid: " <> tshow mExpiry let aad = memcachedAAD cKey mExpiry - decrypted <- hoistMaybe $ AEAD.aeadOpen aeadKey mNonce mCiphertext aad + decrypted <- hoistMaybe $ AEAD.aeadOpen memcachedKey mNonce mCiphertext aad $logDebugS "memcached" $ "Decryption valid " <> bool "without" "with" doExp <> " expiration" + + let withCache = case localARC of + Just AppMemcachedLocal{..} -> cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) + Nothing -> fmap (view _1) . ($ Nothing) + res <- hoistMaybe . preview (_1 . _NFDynamic) <=< withCache $ \case + Nothing -> fmap ((, length decrypted) . (, mExpiry) . review (_NFDynamic @a)) . hoistMaybe $ runGetMaybe Binary.get decrypted + Just p -> return p - hoistMaybe $ runGetMaybe Binary.get decrypted + $logDebugS "memcached" "All valid" + + return res withExp True <|> withExp False where @@ -194,50 +223,112 @@ memcachedByGet (Binary.encode -> k) = runMaybeT $ requestCache <|> memcache memcachedBySet :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => Maybe Expiry -> k -> a -> m () memcachedBySet mExp (Binary.encode -> k) v = do mExp' <- for mExp $ \exp -> maybe (throwM $ MemcachedInvalidExpiry exp) return $ exp ^? _MemcachedExpiry + + let decrypted = toStrict $ Binary.encode v + mExpiry <- for mExp $ \case + Left uTime -> return $ utcTimeToPOSIXSeconds uTime + Right diff -> liftIO $ (+ realToFrac diff) <$> getPOSIXTime + mConn <- getsYesod appMemcached - for_ mConn $ \(aeadKey, conn) -> do + for_ mConn $ \AppMemcached{..} -> do mNonce <- liftIO AEAD.newNonce - mExpiry <- for mExp $ \case - Left uTime -> return $ utcTimeToPOSIXSeconds uTime - Right diff -> liftIO $ (+ realToFrac diff) <$> getPOSIXTime - let cKey = memcachedKey aeadKey (Proxy @a) k + let cKey = toMemcachedKey memcachedKey (Proxy @a) k aad = memcachedAAD cKey mExpiry - mCiphertext = AEAD.aead aeadKey mNonce (toStrict $ Binary.encode v) aad - liftIO $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (Binary.runPut $ putMemcachedValue MemcachedValue{..}) conn - cacheBySet (toStrict k) v + mCiphertext = AEAD.aead memcachedKey mNonce decrypted aad + liftIO $ Memcached.set zeroBits (fromMaybe zeroBits mExp') cKey (Binary.runPut $ putMemcachedValue MemcachedValue{..}) memcachedConn $logDebugS "memcached" $ "Cache store: " <> tshow mExpiry + mLocal <- getsYesod appMemcachedLocal + for_ mLocal $ \AppMemcachedLocal{..} -> do + void . cachedARC memcachedLocalARC (typeRepFingerprint . typeRep $ Proxy @a, k) . const $ return ((_NFDynamic # v, mExpiry), length decrypted) + $logDebugS "memcached" $ "Cache store: " <> tshow mExpiry <> " (local ARC)" + -- DEBUG + let inv = Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..} + where mLocalInvalidateType = typeRepFingerprint . typeRep $ Proxy @a + mLocalInvalidateKey = k + $logDebugS "memcached" $ "To invalidate remotely: " <> tshow inv + memcachedByInvalidate :: forall a k m p. ( MonadHandler m, HandlerSite m ~ UniWorX , Typeable a , Binary k ) => k -> p a -> m () -memcachedByInvalidate (Binary.encode -> k) _ = maybeT_ $ do - (aeadKey, conn) <- MaybeT $ getsYesod appMemcached - let cKey = memcachedKey aeadKey (Proxy @a) k - hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey conn +memcachedByInvalidate (Binary.encode -> k) _ = arc >> memcache + where + memcache = maybeT_ $ do + AppMemcached{..} <- MaybeT $ getsYesod appMemcached + let cKey = toMemcachedKey memcachedKey (Proxy @a) k + hoist liftIO . catchIfMaybeT Memcached.isKeyNotFound $ Memcached.delete cKey memcachedConn + $logDebugS "memcached" "Cache invalidation" + arc = maybeT_ $ do + AppMemcachedLocal{..} <- MaybeT $ getsYesod appMemcachedLocal + let arcKey = (typeRepFingerprint . typeRep $ Proxy @a, k) + atomically $ modifyTVar' memcachedLocalInvalidationQueue (:> arcKey) + void . cachedARC' memcachedLocalARC arcKey . const $ return Nothing + $logDebugS "memcached" "Cache invalidation (local ARC)" + +data MemcachedLocalInvalidateMsg = MemcachedLocalInvalidateMsg + { mLocalInvalidateType :: Fingerprint + , mLocalInvalidateKey :: Lazy.ByteString + } deriving (Eq, Ord, Show, Typeable) + +instance Binary MemcachedLocalInvalidateMsg where + get = Binary.label "MemcachedLocalInvalidateMsg" $ do + mLocalInvalidateType <- Binary.label "mLocalInvalidateType" $ Fingerprint <$> Binary.getWord64le <*> Binary.getWord64le + mLocalInvalidateKey <- Binary.label "mLocalInvalidateKey" Binary.getRemainingLazyByteString + return MemcachedLocalInvalidateMsg{..} + put MemcachedLocalInvalidateMsg{..} = do + let Fingerprint w1 w2 = mLocalInvalidateType + Binary.putWord64le w1 + Binary.putWord64le w2 + Binary.putLazyByteString mLocalInvalidateKey + +manageMemcachedLocalInvalidations :: ( MonadUnliftIO m + , MonadLogger m + ) + => ARCHandle (Fingerprint, Lazy.ByteString) Int (NFDynamic, Maybe POSIXTime) + -> TVar (Seq (Fingerprint, Lazy.ByteString)) + -> PostgresqlChannelManager m () +manageMemcachedLocalInvalidations localARC iQueue = PostgresqlChannelManager + { pgcTerminate = forever $ threadDelay maxBound + , pgcOnInput = Just $ \inpBS -> case Binary.runGetOrFail Binary.get . fromStrict <$> Base64.decode inpBS of + Right (Right (bs', _, MemcachedLocalInvalidateMsg{..})) | null bs' -> + void . cachedARC' localARC (mLocalInvalidateType, mLocalInvalidateKey) $ \mPrev -> do + $logDebugS "memcached" $ "Remote invalidation in local ARC: " <> bool "miss" "hit" (is _Just mPrev) + return Nothing + _other -> $logErrorS "memcached" $ "Received unparseable remote invalidation: " <> tshow inpBS + , pgcGenOutput = atomically $ do + iQueue' <- readTVar iQueue + i <- case iQueue' of + i :< is' -> i <$ writeTVar iQueue is' + _other -> mzero + let (mLocalInvalidateType, mLocalInvalidateKey) = i + return . Base64.encode . toStrict $ Binary.encode MemcachedLocalInvalidateMsg{..} + } newtype MemcachedUnkeyed a = MemcachedUnkeyed { unMemcachedUnkeyed :: a } deriving (Typeable) deriving newtype (Eq, Ord, Show, Binary) +instance NFData a => NFData (MemcachedUnkeyed a) where + rnf = rnf . unMemcachedUnkeyed memcachedGet :: ( MonadHandler m, HandlerSite m ~ UniWorX - , Typeable a, Binary a + , Typeable a, Binary a, NFData a ) => m (Maybe a) memcachedGet = fmap unMemcachedUnkeyed <$> memcachedByGet () memcachedSet :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a ) => Maybe Expiry -> a -> m () memcachedSet mExp = memcachedBySet mExp () . MemcachedUnkeyed @@ -260,14 +351,14 @@ memcachedWith (doGet, doSet) act = do memcached :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a ) => Maybe Expiry -> m a -> m a memcached mExp = memcachedWith (memcachedGet, \x -> x <$ memcachedSet mExp x) memcachedBy :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => Maybe Expiry -> k -> m a -> m a @@ -277,6 +368,8 @@ memcachedBy mExp k = memcachedWith (memcachedByGet k, \x -> x <$ memcachedBySet newtype MemcachedUnkeyedLoc a = MemcachedUnkeyedLoc { unMemcachedUnkeyedLoc :: a } deriving (Typeable) deriving newtype (Eq, Ord, Show, Binary) +instance NFData a => NFData (MemcachedUnkeyedLoc a) where + rnf MemcachedUnkeyedLoc{..} = rnf unMemcachedUnkeyedLoc memcachedHere :: Q Exp memcachedHere = do @@ -286,11 +379,21 @@ memcachedHere = do newtype MemcachedKeyedLoc a = MemcachedKeyedLoc { unMemcachedKeyedLoc :: a } deriving (Typeable) deriving newtype (Eq, Ord, Show, Binary) +instance NFData a => NFData (MemcachedKeyedLoc a) where + rnf MemcachedKeyedLoc{..} = rnf unMemcachedKeyedLoc + +withMemcachedKeyedLoc :: Functor f => (f (MemcachedKeyedLoc a) -> f (MemcachedKeyedLoc a)) -> (f a -> f a) +withMemcachedKeyedLoc act = fmap unMemcachedKeyedLoc . act . fmap MemcachedKeyedLoc +{-# INLINE withMemcachedKeyedLoc #-} + +withMemcachedKeyedLoc' :: (Functor f, Functor f') => (f (MemcachedKeyedLoc a) -> f (f' (MemcachedKeyedLoc a))) -> (f a -> f (f' a)) +withMemcachedKeyedLoc' act = fmap (fmap unMemcachedKeyedLoc) . act . fmap MemcachedKeyedLoc +{-# INLINE withMemcachedKeyedLoc' #-} memcachedByHere :: Q Exp memcachedByHere = do loc <- location - [e| \mExp k -> fmap unMemcachedKeyedLoc . memcachedBy mExp (loc, k) . fmap MemcachedKeyedLoc |] + [e| \mExp k -> withMemcachedKeyedLoc (memcachedBy mExp (loc, k)) |] data HashableDynamic = forall a. (Hashable a, Eq a) => HashableDynamic !(TypeRep a) !a @@ -348,7 +451,7 @@ memcachedLimitedWith (doGet, doSet) liftAct (hashableDynamic -> lK) burst rate t memcachedLimited :: forall a m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) @@ -361,7 +464,7 @@ memcachedLimited burst rate tokens mExp = memcachedLimitedWith (memcachedGet, me memcachedLimitedKey :: forall a k' m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' ) => k' @@ -376,7 +479,7 @@ memcachedLimitedKey lK burst rate tokens mExp = memcachedLimitedWith (memcachedG memcachedLimitedBy :: forall a k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => Word64 -- ^ burst-size (tokens) @@ -391,7 +494,7 @@ memcachedLimitedBy burst rate tokens mExp k = memcachedLimitedWith (memcachedByG memcachedLimitedKeyBy :: forall a k' k m. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' , Binary k ) @@ -418,18 +521,18 @@ memcachedLimitedKeyHere = do memcachedLimitedByHere :: Q Exp memcachedLimitedByHere = do loc <- location - [e| \burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedBy burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |] + [e| \burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedBy burst rate tokens mExp (loc, k)) |] memcachedLimitedKeyByHere :: Q Exp memcachedLimitedKeyByHere = do loc <- location - [e| \lK burst rate tokens mExp k -> fmap (fmap unMemcachedKeyedLoc) . memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k) . fmap MemcachedKeyedLoc |] + [e| \lK burst rate tokens mExp k -> withMemcachedKeyedLoc' (memcachedLimitedKeyBy lK burst rate tokens mExp (loc, k)) |] memcacheAuth :: forall m k a. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => k @@ -450,7 +553,7 @@ memcacheAuth k mx = cachedByBinary k $ do memcacheAuth' :: forall m k a. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => Expiry @@ -462,7 +565,7 @@ memcacheAuth' exp k = memcacheAuth k . (<* tell (Just $ Min exp)) . lift memcacheAuthMax :: forall m k a. ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => Expiry @@ -474,17 +577,17 @@ memcacheAuthMax exp k = memcacheAuth k . (tell (Just $ Min exp) *>) memcacheAuthHere :: Q Exp memcacheAuthHere = do loc <- location - [e| \k -> fmap unMemcachedKeyedLoc . memcacheAuth (loc, k) . fmap MemcachedKeyedLoc |] + [e| \k -> withMemcachedKeyedLoc (memcacheAuth (loc, k)) |] memcacheAuthHere' :: Q Exp memcacheAuthHere' = do loc <- location - [e| \exp k -> fmap unMemcachedKeyedLoc . memcacheAuth' exp (loc, k) . fmap MemcachedKeyedLoc |] + [e| \exp k -> withMemcachedKeyedLoc (memcacheAuth' exp (loc, k)) |] memcacheAuthHereMax :: Q Exp memcacheAuthHereMax = do loc <- location - [e| \exp k -> fmap unMemcachedKeyedLoc . memcacheAuthMax exp (loc, k) . fmap MemcachedKeyedLoc |] + [e| \exp k -> withMemcachedKeyedLoc (memcacheAuthMax exp (loc, k)) |] @@ -576,7 +679,7 @@ memcachedTimeout :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' - , Typeable a, Binary a + , Typeable a, Binary a, NFData a ) => Maybe Expiry -> DiffTime -> k'' -> m a -> m (Maybe a) memcachedTimeout mExp = memcachedTimeoutWith (memcachedGet, memcachedSet mExp) @@ -585,7 +688,7 @@ memcachedTimeoutBy :: ( MonadHandler m, HandlerSite m ~ UniWorX , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => Maybe Expiry -> DiffTime -> k'' -> k -> m a -> m (Maybe a) @@ -606,7 +709,7 @@ memcachedLimitedTimeout :: forall a k'' m. , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' - , Typeable a, Binary a + , Typeable a, Binary a, NFData a ) => Word64 -- ^ burst-size (tokens) -> Word64 -- ^ avg. inverse rate (usec/token) @@ -623,7 +726,7 @@ memcachedLimitedKeyTimeout :: forall a k' k'' m. , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' ) => k' @@ -642,7 +745,7 @@ memcachedLimitedTimeoutBy :: forall a k'' k m. , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Binary k ) => Word64 -- ^ burst-size (tokens) @@ -661,7 +764,7 @@ memcachedLimitedKeyTimeoutBy :: forall a k' k'' k m. , MonadThrow m , MonadUnliftIO m , Typeable k'', Hashable k'', Eq k'' - , Typeable a, Binary a + , Typeable a, Binary a, NFData a , Typeable k', Hashable k', Eq k' , Binary k ) diff --git a/src/Import/NoModel.hs b/src/Import/NoModel.hs index 8491e2f60..c8854d786 100644 --- a/src/Import/NoModel.hs +++ b/src/Import/NoModel.hs @@ -75,6 +75,9 @@ import Data.Monoid as Import (Last(..), First(..), Any(..), All(..), import Data.Binary as Import (Binary) import Data.Binary.Instances as Import () +import Data.Dynamic as Import (Dynamic) +import Data.Dynamic.Lens as Import + import System.FilePath as Import hiding (joinPath, normalise, isValid, makeValid) import Numeric.Natural as Import (Natural) diff --git a/src/Jobs/HealthReport.hs b/src/Jobs/HealthReport.hs index a3bfb46b4..83c685700 100644 --- a/src/Jobs/HealthReport.hs +++ b/src/Jobs/HealthReport.hs @@ -60,7 +60,7 @@ dispatchHealthCheckMatchingClusterConfig dbSetting <- clusterSetting @'ClusterId return $ Just ourSetting == dbSetting clusterSettingMatches ClusterMemcachedKey = do - ourSetting <- getsYesod $ fmap fst . appMemcached + ourSetting <- getsYesod $ fmap memcachedKey . appMemcached dbSetting <- clusterSetting @'ClusterMemcachedKey return $ maybe True ((== dbSetting) . Just) ourSetting clusterSettingMatches ClusterVerpSecret = do diff --git a/src/Jobs/Offload.hs b/src/Jobs/Offload.hs index 1176823e9..8eeaf657d 100644 --- a/src/Jobs/Offload.hs +++ b/src/Jobs/Offload.hs @@ -2,23 +2,14 @@ module Jobs.Offload ( mkJobOffloadHandler ) where -import Import hiding (bracket, js) +import Import hiding (js) import Jobs.Types import Jobs.Queue -import qualified Database.PostgreSQL.Simple as PG -import qualified Database.PostgreSQL.Simple.Types as PG -import qualified Database.PostgreSQL.Simple.Notification as PG - -import Database.Persist.Postgresql (PostgresConf, pgConnStr) +import Utils.Postgresql import Data.Text.Encoding (decodeUtf8') -import UnliftIO.Exception (bracket) - - -jobOffloadChannel :: Text -jobOffloadChannel = "job-offload" mkJobOffloadHandler :: forall m. ( MonadResource m @@ -32,39 +23,21 @@ mkJobOffloadHandler dbConf jMode | not shouldListen = Nothing | otherwise = Just $ do jobOffloadOutgoing <- newTVarIO mempty - jobOffloadHandler <- allocateAsync . bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do - myPid <- liftIO $ PG.getBackendPID pgConn - - when shouldListen $ - void . liftIO $ PG.execute pgConn "LISTEN ?" (PG.Only $ PG.Identifier jobOffloadChannel) - - foreverBreak $ \(($ ()) -> terminate) -> do - UniWorX{appJobState} <- ask - shouldTerminate <- atomically $ readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown - when shouldTerminate terminate - - let - getInput = do - n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn - if | notificationPid == myPid || notificationChannel /= encodeUtf8 jobOffloadChannel -> getInput - | otherwise -> return n - getOutput = atomically $ do - jQueue <- readTVar jobOffloadOutgoing - case jQueue of - j :< js -> j <$ writeTVar jobOffloadOutgoing js - _other -> mzero - - io <- lift $ if - | shouldListen -> getInput `race` getOutput - | otherwise -> Right <$> getOutput - - case io of - Left PG.Notification{..} - | Just jId <- fromPathPiece =<< either (const Nothing) Just (decodeUtf8' notificationData) - -> writeJobCtl $ JobCtlPerform jId - | otherwise - -> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow notificationData - Right jId -> void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier jobOffloadChannel, encodeUtf8 $ toPathPiece jId) - + jobOffloadHandler <- allocateAsync $ managePostgresqlChannel dbConf ChannelJobOffload PostgresqlChannelManager + { pgcTerminate = do + UniWorX{appJobState} <- ask + atomically $ do + shouldTerminate <- readTMVar appJobState >>= fmap not . isEmptyTMVar . jobShutdown + guardOn shouldTerminate () + , pgcOnInput = Just $ \inpBS -> case fromPathPiece =<< either (const Nothing) Just (decodeUtf8' inpBS) of + Nothing -> $logErrorS "JobOffloadHandler" $ "Could not parse incoming notification data: " <> tshow inpBS + Just jId -> writeJobCtl $ JobCtlPerform jId + , pgcGenOutput = atomically $ do + jQueue <- readTVar jobOffloadOutgoing + j <- case jQueue of + j :< js -> j <$ writeTVar jobOffloadOutgoing js + _other -> mzero + return . encodeUtf8 $ toPathPiece j + } return JobOffloadHandler{..} where shouldListen = has (_jobsAcceptOffload . only True) jMode diff --git a/src/Model/Types/Security.hs b/src/Model/Types/Security.hs index 756d69750..3e4715d61 100644 --- a/src/Model/Types/Security.hs +++ b/src/Model/Types/Security.hs @@ -93,7 +93,7 @@ data AuthTag -- sortiert nach gewünschter Reihenfolge auf /authpreds, d.h. Prä | AuthDevelopment | AuthFree deriving (Eq, Ord, Enum, Bounded, Read, Show, Data, Generic, Typeable) - deriving anyclass (Universe, Finite, Hashable) + deriving anyclass (Universe, Finite, Hashable, NFData) nullaryPathPiece ''AuthTag $ camelToPathPiece' 1 pathPieceJSON ''AuthTag @@ -157,7 +157,7 @@ _ReducedActiveAuthTags = iso toReducedActiveAuthTags fromReducedActiveAuthTags data PredLiteral a = PLVariable { plVar :: a } | PLNegated { plVar :: a } deriving (Eq, Ord, Read, Show, Data, Generic, Typeable) - deriving anyclass (Hashable, Binary) + deriving anyclass (Hashable, Binary, NFData) makeLenses_ ''PredLiteral makePrisms ''PredLiteral @@ -178,7 +178,7 @@ instance PathPiece a => PathPiece (PredLiteral a) where newtype PredDNF a = PredDNF { dnfTerms :: Set (NonNull (Set (PredLiteral a))) } deriving (Eq, Ord, Read, Show, Data, Generic, Typeable) - deriving anyclass (Binary, Hashable) + deriving anyclass (Binary, Hashable, NFData) makeLenses_ ''PredDNF diff --git a/src/Model/Types/Workflow.hs b/src/Model/Types/Workflow.hs index 2d7a7a496..8ab3510e1 100644 --- a/src/Model/Types/Workflow.hs +++ b/src/Model/Types/Workflow.hs @@ -205,6 +205,7 @@ data WorkflowRole userid | WorkflowRoleAuthorized { workflowRoleAuthorized :: AuthDNF } | WorkflowRoleInitiator deriving (Eq, Ord, Show, Read, Data, Generic, Typeable) + deriving anyclass (NFData) ----- WORKFLOW GRAPH: PAYLOAD SPECIFICATION ----- @@ -343,7 +344,7 @@ data WorkflowScope termid schoolid courseid | WSTermSchool { wisTerm :: termid, wisSchool :: schoolid } | WSCourse { wisCourse :: courseid } deriving (Eq, Ord, Show, Read, Data, Generic, Typeable) - deriving anyclass (Hashable) + deriving anyclass (Hashable, NFData) data WorkflowScope' = WSGlobal' | WSTerm' | WSSchool' | WSTermSchool' | WSCourse' @@ -363,6 +364,7 @@ classifyWorkflowScope = \case newtype WorkflowPayloadLabel = WorkflowPayloadLabel { unWorkflowPayloadLabel :: CI Text } deriving stock (Eq, Ord, Show, Read, Data, Generic, Typeable) deriving newtype (IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey, PathPiece, PersistField, Binary) + deriving anyclass (NFData) instance PersistFieldSql WorkflowPayloadLabel where sqlType _ = sqlType $ Proxy @(CI Text) diff --git a/src/Settings.hs b/src/Settings.hs index 9006adba0..deac6d484 100644 --- a/src/Settings.hs +++ b/src/Settings.hs @@ -186,6 +186,8 @@ data AppSettings = AppSettings , appCookieSettings :: RegisteredCookie -> CookieSettings , appMemcachedConf :: Maybe MemcachedConf + , appMemcacheAuth :: Bool + , appMemcachedLocalConf :: Maybe (ARCConf Int) , appUploadCacheConf :: Maybe Minio.ConnectInfo , appUploadCacheBucket, appUploadTmpBucket :: Minio.Bucket @@ -215,8 +217,6 @@ data AppSettings = AppSettings , appStudyFeaturesRecacheRelevanceWithin :: Maybe NominalDiffTime , appStudyFeaturesRecacheRelevanceInterval :: NominalDiffTime - , appMemcacheAuth :: Bool - , appFileSourceARCConf :: Maybe (ARCConf Int) , appFileSourcePrewarmConf :: Maybe PrewarmCacheConf @@ -533,12 +533,15 @@ instance FromJSON AppSettings where ] appWidgetMemcachedConf <- assertM validWidgetMemcachedConf <$> o .:? "widget-memcached" appSessionMemcachedConf <- assertM validMemcachedConf <$> o .:? "session-memcached" - appMemcachedConf <- assertM validMemcachedConf <$> o .:? "memcached" appRoot <- o .:? "approot" .!= const Nothing appHost <- fromString <$> o .: "host" appPort <- o .: "port" appIpFromHeader <- o .: "ip-from-header" + appMemcachedConf <- assertM validMemcachedConf <$> o .:? "memcached" + appMemcacheAuth <- o .:? "memcache-auth" .!= False + appMemcachedLocalConf <- assertM isValidARCConf <$> o .:? "memcached-local" + appMailFrom <- o .: "mail-from" appMailEnvelopeFrom <- o .:? "mail-envelope-from" .!= addressEmail appMailFrom appMailSender <- o .:? "mail-sender" .!= appMailFrom @@ -654,14 +657,11 @@ instance FromJSON AppSettings where appDownloadTokenExpire <- o .: "download-token-expire" - appMemcacheAuth <- o .:? "memcache-auth" .!= False - appJobMode <- o .:? "job-mode" .!= JobsLocal True appStudyFeaturesRecacheRelevanceWithin <- o .:? "study-features-recache-relevance-within" appStudyFeaturesRecacheRelevanceInterval <- o .: "study-features-recache-relevance-interval" - let isValidARCConf ARCConf{..} = arccMaximumWeight > 0 appFileSourceARCConf <- assertM isValidARCConf <$> o .:? "file-source-arc" let isValidPrewarmConf PrewarmCacheConf{..} = and @@ -676,6 +676,7 @@ instance FromJSON AppSettings where appBotMitigations <- o .:? "bot-mitigations" .!= Set.empty return AppSettings{..} + where isValidARCConf ARCConf{..} = arccMaximumWeight > 0 makeClassy_ ''AppSettings diff --git a/src/Utils/ARC.hs b/src/Utils/ARC.hs index 0f1b94ee5..5ae8a724d 100644 --- a/src/Utils/ARC.hs +++ b/src/Utils/ARC.hs @@ -8,19 +8,44 @@ module Utils.ARC , arcRecentSize, arcFrequentSize, arcGhostRecentSize, arcGhostFrequentSize , getARCRecentWeight, getARCFrequentWeight , describeARC + , NFDynamic(..), _NFDynamic, DynARC, DynARCHandle ) where import ClassyPrelude -import Data.OrdPSQ (OrdPSQ) -import qualified Data.OrdPSQ as OrdPSQ +import Data.HashPSQ (HashPSQ) +import qualified Data.HashPSQ as HashPSQ import Control.Lens +import Type.Reflection +import Text.Show (showString) + +import Data.Hashable (Hashed, hashed) + -- https://web.archive.org/web/20210115184012/https://dbs.uni-leipzig.de/file/ARC.pdf -- https://jaspervdj.be/posts/2015-02-24-lru-cache.html +data NFDynamic where + NFDynamic :: forall a. NFData a => TypeRep a -> a -> NFDynamic + +_NFDynamic :: forall a. (Typeable a, NFData a) => Prism' NFDynamic a +_NFDynamic = prism' toNFDyn fromNFDynamic + where + toNFDyn v = NFDynamic typeRep v + fromNFDynamic (NFDynamic t v) + | Just HRefl <- t `eqTypeRep` rep = Just v + | otherwise = Nothing + where rep = typeRep :: TypeRep a + +instance NFData NFDynamic where + rnf (NFDynamic t v) = rnfTypeRep t `seq` rnf v + +instance Show NFDynamic where + showsPrec _ (NFDynamic t _) = showString "<<" . showsPrec 0 t . showString ">>" + + newtype ARCTick = ARCTick { _getARCTick :: Word64 } deriving (Eq, Ord, Show, Typeable) deriving newtype (NFData) @@ -28,13 +53,15 @@ newtype ARCTick = ARCTick { _getARCTick :: Word64 } makeLenses ''ARCTick data ARC k w v = ARC - { arcRecent, arcFrequent :: !(OrdPSQ k ARCTick (v, w)) - , arcGhostRecent, arcGhostFrequent :: !(OrdPSQ k ARCTick ()) + { arcRecent, arcFrequent :: !(HashPSQ (Hashed k) ARCTick (v, w)) + , arcGhostRecent, arcGhostFrequent :: !(HashPSQ (Hashed k) ARCTick ()) , arcRecentWeight, arcFrequentWeight :: !w , arcTargetRecent, arcMaximumWeight :: !w , arcMaximumGhost :: !Int } +type DynARC k w = ARC (SomeTypeRep, k) w NFDynamic + instance (NFData k, NFData w, NFData v) => NFData (ARC k w v) where rnf ARC{..} = rnf arcRecent `seq` rnf arcFrequent @@ -50,10 +77,10 @@ describeARC :: Show w => ARC k w v -> String describeARC ARC{..} = intercalate ", " - [ "arcRecent: " <> show (OrdPSQ.size arcRecent) - , "arcFrequent: " <> show (OrdPSQ.size arcFrequent) - , "arcGhostRecent: " <> show (OrdPSQ.size arcGhostRecent) - , "arcGhostFrequent: " <> show (OrdPSQ.size arcGhostFrequent) + [ "arcRecent: " <> show (HashPSQ.size arcRecent) + , "arcFrequent: " <> show (HashPSQ.size arcFrequent) + , "arcGhostRecent: " <> show (HashPSQ.size arcGhostRecent) + , "arcGhostFrequent: " <> show (HashPSQ.size arcGhostFrequent) , "arcRecentWeight: " <> show arcRecentWeight , "arcFrequentWeight: " <> show arcFrequentWeight , "arcTargetRecent: " <> show arcTargetRecent @@ -62,10 +89,10 @@ describeARC ARC{..} = intercalate ", " ] arcRecentSize, arcFrequentSize, arcGhostRecentSize, arcGhostFrequentSize :: ARC k w v -> Int -arcRecentSize = OrdPSQ.size . arcRecent -arcFrequentSize = OrdPSQ.size . arcFrequent -arcGhostRecentSize = OrdPSQ.size . arcGhostRecent -arcGhostFrequentSize = OrdPSQ.size . arcGhostFrequent +arcRecentSize = HashPSQ.size . arcRecent +arcFrequentSize = HashPSQ.size . arcFrequent +arcGhostRecentSize = HashPSQ.size . arcGhostRecent +arcGhostFrequentSize = HashPSQ.size . arcGhostFrequent getARCRecentWeight, getARCFrequentWeight :: ARC k w v -> w getARCRecentWeight = arcRecentWeight @@ -83,10 +110,10 @@ initARC arcMaximumGhost arcMaximumWeight | arcMaximumWeight < 0 = error "initARC given negative maximum weight" | arcMaximumGhost < 0 = error "initARC given negative maximum ghost size" | otherwise = (, initialARCTick) ARC - { arcRecent = OrdPSQ.empty - , arcFrequent = OrdPSQ.empty - , arcGhostRecent = OrdPSQ.empty - , arcGhostFrequent = OrdPSQ.empty + { arcRecent = HashPSQ.empty + , arcFrequent = HashPSQ.empty + , arcGhostRecent = HashPSQ.empty + , arcGhostFrequent = HashPSQ.empty , arcRecentWeight = 0 , arcFrequentWeight = 0 , arcMaximumWeight @@ -103,7 +130,7 @@ infixl 6 |- arcAlterF :: forall f k w v. - ( Ord k + ( Ord k, Hashable k , Functor f , Integral w ) @@ -112,57 +139,57 @@ arcAlterF :: forall f k w v. -> ARC k w v -> ARCTick -> f (ARC k w v, ARCTick) -- | Unchecked precondition: item weights are always less than `arcMaximumWeight` -arcAlterF k f oldARC@ARC{..} now - | later <= initialARCTick = uncurry (arcAlterF k f) $ initARC arcMaximumGhost arcMaximumWeight +arcAlterF unhashedK@(hashed -> k) f oldARC@ARC{..} now + | later <= initialARCTick = uncurry (arcAlterF unhashedK f) $ initARC arcMaximumGhost arcMaximumWeight | otherwise = (, later) <$> if - | Just (_p, x@(_, w), arcFrequent') <- OrdPSQ.deleteView k arcFrequent + | Just (_p, x@(_, w), arcFrequent') <- HashPSQ.deleteView k arcFrequent -> f (Just x) <&> \(fromMaybe x -> x'@(_, w')) -> let (arcFrequent'', arcFrequentWeight'', arcGhostFrequent') = evictToSize (arcMaximumWeight |- arcTargetRecent |- w') arcFrequent' (arcFrequentWeight - w) arcGhostFrequent in oldARC - { arcFrequent = OrdPSQ.insert k now x' arcFrequent'' + { arcFrequent = HashPSQ.insert k now x' arcFrequent'' , arcFrequentWeight = arcFrequentWeight'' + w' , arcGhostFrequent = arcGhostFrequent' } - | Just (_p, x@(_, w), arcRecent') <- OrdPSQ.deleteView k arcRecent + | Just (_p, x@(_, w), arcRecent') <- HashPSQ.deleteView k arcRecent -> f (Just x) <&> \(fromMaybe x -> x'@(_, w')) -> let (arcFrequent', arcFrequentWeight', arcGhostFrequent') = evictToSize (arcMaximumWeight |- arcTargetRecent |- w') arcFrequent arcFrequentWeight arcGhostFrequent in oldARC { arcRecent = arcRecent' , arcRecentWeight = arcRecentWeight - w - , arcFrequent = OrdPSQ.insert k now x' arcFrequent' + , arcFrequent = HashPSQ.insert k now x' arcFrequent' , arcFrequentWeight = arcFrequentWeight' + w' , arcGhostFrequent = arcGhostFrequent' } - | Just (_p, (), arcGhostRecent') <- OrdPSQ.deleteView k arcGhostRecent + | Just (_p, (), arcGhostRecent') <- HashPSQ.deleteView k arcGhostRecent -> f Nothing <&> \case Nothing -> oldARC - { arcGhostRecent = OrdPSQ.insert k now () arcGhostRecent' + { arcGhostRecent = HashPSQ.insert k now () arcGhostRecent' } Just x@(_, w) - -> let arcTargetRecent' = min arcMaximumWeight $ arcTargetRecent + max avgWeight (round $ toRational (OrdPSQ.size arcGhostFrequent) / toRational (OrdPSQ.size arcGhostRecent) * toRational avgWeight) + -> let arcTargetRecent' = min arcMaximumWeight $ arcTargetRecent + max avgWeight (round $ toRational (HashPSQ.size arcGhostFrequent) / toRational (HashPSQ.size arcGhostRecent) * toRational avgWeight) (arcFrequent', arcFrequentWeight', arcGhostFrequent') = evictToSize (arcMaximumWeight |- arcTargetRecent' |- w) arcFrequent arcFrequentWeight arcGhostFrequent (arcRecent', arcRecentWeight', arcGhostRecent'') = evictToSize (max arcTargetRecent' $ arcMaximumWeight |- arcFrequentWeight' |- w) arcRecent arcRecentWeight arcGhostRecent' in oldARC { arcRecent = arcRecent' - , arcFrequent = OrdPSQ.insert k now x arcFrequent' + , arcFrequent = HashPSQ.insert k now x arcFrequent' , arcGhostRecent = arcGhostRecent'' , arcGhostFrequent = arcGhostFrequent' , arcRecentWeight = arcRecentWeight' , arcFrequentWeight = arcFrequentWeight' + w , arcTargetRecent = arcTargetRecent' } - | Just (_p, (), arcGhostFrequent') <- OrdPSQ.deleteView k arcGhostFrequent + | Just (_p, (), arcGhostFrequent') <- HashPSQ.deleteView k arcGhostFrequent -> f Nothing <&> \case Nothing -> oldARC - { arcGhostFrequent = OrdPSQ.insert k now () arcGhostFrequent' + { arcGhostFrequent = HashPSQ.insert k now () arcGhostFrequent' } Just x@(_, w) - -> let arcTargetRecent' = arcTargetRecent |- max avgWeight (round $ toRational (OrdPSQ.size arcGhostRecent) / toRational (OrdPSQ.size arcGhostFrequent) * toRational avgWeight) + -> let arcTargetRecent' = arcTargetRecent |- max avgWeight (round $ toRational (HashPSQ.size arcGhostRecent) / toRational (HashPSQ.size arcGhostFrequent) * toRational avgWeight) (arcFrequent', arcFrequentWeight', arcGhostFrequent'') = evictToSize (arcMaximumWeight |- arcTargetRecent' |- w) arcFrequent arcFrequentWeight arcGhostFrequent' (arcRecent', arcRecentWeight', arcGhostRecent') = evictToSize (max arcTargetRecent' $ arcMaximumWeight |- arcFrequentWeight' |- w) arcRecent arcRecentWeight arcGhostRecent in oldARC { arcRecent = arcRecent' - , arcFrequent = OrdPSQ.insert k now x arcFrequent' + , arcFrequent = HashPSQ.insert k now x arcFrequent' , arcGhostRecent = arcGhostRecent' , arcGhostFrequent = arcGhostFrequent'' , arcRecentWeight = arcRecentWeight' @@ -171,35 +198,35 @@ arcAlterF k f oldARC@ARC{..} now } | otherwise -> f Nothing <&> \case Nothing -> oldARC - { arcGhostRecent = OrdPSQ.insert k now () $ evictGhostToCount arcGhostRecent + { arcGhostRecent = HashPSQ.insert k now () $ evictGhostToCount arcGhostRecent } Just x@(_, w) -> let (arcRecent', arcRecentWeight', arcGhostRecent') = evictToSize (max arcTargetRecent (arcMaximumWeight |- arcFrequentWeight) |- w) arcRecent arcRecentWeight arcGhostRecent in oldARC - { arcRecent = OrdPSQ.insert k now x arcRecent' + { arcRecent = HashPSQ.insert k now x arcRecent' , arcRecentWeight = arcRecentWeight' + w , arcGhostRecent = arcGhostRecent' } where - avgWeight = round $ toRational (arcRecentWeight + arcFrequentWeight) / toRational (OrdPSQ.size arcFrequent + OrdPSQ.size arcRecent) + avgWeight = round $ toRational (arcRecentWeight + arcFrequentWeight) / toRational (HashPSQ.size arcFrequent + HashPSQ.size arcRecent) later :: ARCTick later = over getARCTick succ now - evictToSize :: w -> OrdPSQ k ARCTick (v, w) -> w -> OrdPSQ k ARCTick () -> (OrdPSQ k ARCTick (v, w), w, OrdPSQ k ARCTick ()) + evictToSize :: w -> HashPSQ (Hashed k) ARCTick (v, w) -> w -> HashPSQ (Hashed k) ARCTick () -> (HashPSQ (Hashed k) ARCTick (v, w), w, HashPSQ (Hashed k) ARCTick ()) evictToSize tSize c cSize ghostC | cSize <= tSize = (c, cSize, ghostC) - | Just (k', p', (_, w'), c') <- OrdPSQ.minView c = evictToSize tSize c' (cSize - w') . evictGhostToCount $ OrdPSQ.insert k' p' () ghostC + | Just (k', p', (_, w'), c') <- HashPSQ.minView c = evictToSize tSize c' (cSize - w') . evictGhostToCount $ HashPSQ.insert k' p' () ghostC | otherwise = error "evictToSize: cannot reach required size through eviction" - evictGhostToCount :: OrdPSQ k ARCTick () -> OrdPSQ k ARCTick () + evictGhostToCount :: HashPSQ (Hashed k) ARCTick () -> HashPSQ (Hashed k) ARCTick () evictGhostToCount c - | OrdPSQ.size c <= arcMaximumGhost = c - | Just (_, _, _, c') <- OrdPSQ.minView c = evictGhostToCount c' + | HashPSQ.size c <= arcMaximumGhost = c + | Just (_, _, _, c') <- HashPSQ.minView c = evictGhostToCount c' | otherwise = error "evictGhostToCount: cannot reach required count through eviction" lookupARC :: forall k w v. - ( Ord k + ( Ord k, Hashable k , Integral w ) => k @@ -208,7 +235,7 @@ lookupARC :: forall k w v. lookupARC k = getConst . uncurry (arcAlterF k Const) insertARC :: forall k w v. - ( Ord k + ( Ord k, Hashable k , Integral w ) => k @@ -221,6 +248,8 @@ insertARC k newVal = (runIdentity .) . arcAlterF k (const $ pure newVal) newtype ARCHandle k w v = ARCHandle { _getARCHandle :: IORef (ARC k w v, ARCTick) } deriving (Eq, Typeable) +type DynARCHandle k w = ARCHandle (SomeTypeRep, k) w NFDynamic + initARCHandle :: forall k w v m. ( MonadIO m , Integral w @@ -232,7 +261,7 @@ initARCHandle maxGhost maxWeight = fmap ARCHandle . newIORef $ initARC maxGhost cachedARC' :: forall k w v m. ( MonadIO m - , Ord k + , Ord k, Hashable k , Integral w , NFData k, NFData w, NFData v ) @@ -261,7 +290,7 @@ cachedARC' (ARCHandle arcVar) k f = do cachedARC :: forall k w v m. ( MonadIO m - , Ord k + , Ord k, Hashable k , Integral w , NFData k, NFData w, NFData v ) @@ -273,7 +302,7 @@ cachedARC h k f = fromMaybe (error "cachedARC: cachedARC' returned Nothing") <$> lookupARCHandle :: forall k w v m. ( MonadIO m - , Ord k + , Ord k, Hashable k , Integral w ) => ARCHandle k w v diff --git a/src/Utils/I18n.hs b/src/Utils/I18n.hs index 08595630d..9f71d05dc 100644 --- a/src/Utils/I18n.hs +++ b/src/Utils/I18n.hs @@ -57,7 +57,7 @@ data I18n a = I18n , i18nFallbackLang :: Maybe Lang , i18nTranslations :: Map Lang a } deriving (Eq, Ord, Read, Show, Functor, Foldable, Traversable, Data, Generic, Typeable) - deriving anyclass (MonoFunctor, MonoFoldable, MonoTraversable, Binary) + deriving anyclass (MonoFunctor, MonoFoldable, MonoTraversable, Binary, NFData) type instance Element (I18n a) = a type I18nText = I18n Text diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 97d26cbac..b4a7fe5c3 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -272,7 +272,7 @@ relabel l s (SampleGroup i t ss) = SampleGroup i t . flip map ss $ \(Sample k lb data ARCMetrics = ARCMetrics -data ARCLabel = ARCFileSource +data ARCLabel = ARCFileSource | ARCMemcachedLocal deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) deriving anyclass (Universe, Finite) diff --git a/src/Utils/Postgresql.hs b/src/Utils/Postgresql.hs new file mode 100644 index 000000000..b9c19efbc --- /dev/null +++ b/src/Utils/Postgresql.hs @@ -0,0 +1,67 @@ +module Utils.Postgresql + ( PostgresqlChannel(..) + , PostgresqlChannelManager(..) + , managePostgresqlChannel + , PostgresConf + ) where + +import Import.NoFoundation hiding (bracket) + +import qualified Database.PostgreSQL.Simple as PG +import qualified Database.PostgreSQL.Simple.Types as PG +import qualified Database.PostgreSQL.Simple.Notification as PG + +import Database.Persist.Postgresql (PostgresConf, pgConnStr) + +import UnliftIO.Exception (bracket) + + +data PostgresqlChannel + = ChannelJobOffload + | ChannelMemcachedLocalInvalidation + deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable) + deriving anyclass (Universe, Finite) + +nullaryPathPiece ''PostgresqlChannel $ camelToPathPiece' 1 + + +data PostgresqlChannelManager m a = PostgresqlChannelManager + { pgcTerminate :: m a -- ^ Expected to block; used within `race` + , pgcOnInput :: Maybe (ByteString -> m ()) + , pgcGenOutput :: m ByteString -- ^ Expected to block; used within `race` + } + +managePostgresqlChannel :: forall m a. + ( MonadUnliftIO m + , MonadLogger m + ) + => PostgresConf + -> PostgresqlChannel + -> PostgresqlChannelManager m a + -> m a +managePostgresqlChannel dbConf (toPathPiece -> chan) PostgresqlChannelManager{..} = bracket (liftIO . PG.connectPostgreSQL $ pgConnStr dbConf) (liftIO . PG.close) $ \pgConn -> do + myPid <- liftIO $ PG.getBackendPID pgConn + when (is _Just pgcOnInput) $ + void . liftIO . PG.execute pgConn "LISTEN ?" . PG.Only $ PG.Identifier chan + + let + getInput = do + n@PG.Notification{..} <- liftIO $ PG.getNotification pgConn + if | notificationPid == myPid || notificationChannel /= encodeUtf8 chan -> getInput + | otherwise -> return n + + foreverBreak $ \terminate -> do + io <- lift . (pgcTerminate `race`) $ if + | is _Just pgcOnInput -> getInput `race` pgcGenOutput + | otherwise -> Right <$> pgcGenOutput + + case io of + Right (Left notif@PG.Notification{..}) -> do + $logDebugS "PGChannel" $ "Got input: " <> tshow notif + lift $ maybe (return ()) ($ notificationData) pgcOnInput + Right (Right o) -> do + void . liftIO $ PG.execute pgConn "NOTIFY ?, ?" (PG.Identifier chan, o) + $logDebugS "PGChannel" $ "Sent output: " <> tshow o + Left t -> do + $logDebugS "PGChannel" "Terminating..." + terminate t diff --git a/src/Yesod/Core/Types/Instances.hs b/src/Yesod/Core/Types/Instances.hs index 744e62256..ba60b6680 100644 --- a/src/Yesod/Core/Types/Instances.hs +++ b/src/Yesod/Core/Types/Instances.hs @@ -81,3 +81,4 @@ instance site ~ site' => ToWidget site (SomeMessage site') where deriving instance Generic AuthResult instance Binary AuthResult +instance NFData AuthResult