feat(metrics): measure file i/o
This commit is contained in:
parent
6d475497c0
commit
4801d22cb3
@ -40,9 +40,11 @@ sourceFileDB fileReference = do
|
|||||||
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
|
return $ E.substring (fileContentChunk E.^. FileContentChunkContent) (E.val start) (E.val dbChunksize)
|
||||||
case chunk of
|
case chunk of
|
||||||
Nothing -> throwM SourceFilesContentUnavailable
|
Nothing -> throwM SourceFilesContentUnavailable
|
||||||
Just (E.Value c) -> return . Just . (c, ) $ if
|
Just (E.Value c) -> do
|
||||||
| olength c >= dbChunksize -> Just $ start + dbChunksize
|
observeSourcedChunk StorageDB $ olength c
|
||||||
| otherwise -> Nothing
|
return . Just . (c, ) $ if
|
||||||
|
| olength c >= dbChunksize -> Just $ start + dbChunksize
|
||||||
|
| otherwise -> Nothing
|
||||||
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
chunkHashes = E.selectSource . E.from $ \fileContentEntry -> do
|
||||||
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
E.where_ $ fileContentEntry E.^. FileContentEntryHash E.==. E.val fileReference
|
||||||
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
E.orderBy [ E.asc $ fileContentEntry E.^. FileContentEntryIx ]
|
||||||
@ -65,7 +67,10 @@ sourceFileMinio fileReference = do
|
|||||||
mChunk <- atomically $ Right <$> takeTMVar chunkVar
|
mChunk <- atomically $ Right <$> takeTMVar chunkVar
|
||||||
<|> Left <$> waitCatchSTM minioAsync
|
<|> Left <$> waitCatchSTM minioAsync
|
||||||
case mChunk of
|
case mChunk of
|
||||||
Right chunk -> yield chunk >> go
|
Right chunk -> do
|
||||||
|
observeSourcedChunk StorageMinio $ olength chunk
|
||||||
|
yield chunk
|
||||||
|
go
|
||||||
Left (Right ()) -> return ()
|
Left (Right ()) -> return ()
|
||||||
Left (Left exc) -> throwM exc
|
Left (Left exc) -> throwM exc
|
||||||
in go
|
in go
|
||||||
|
|||||||
@ -101,6 +101,7 @@ handleJobs foundation@UniWorX{..}
|
|||||||
jobShutdown <- liftIO newEmptyTMVarIO
|
jobShutdown <- liftIO newEmptyTMVarIO
|
||||||
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
|
jobCurrentCrontab <- liftIO $ newTVarIO Nothing
|
||||||
jobHeldLocks <- liftIO $ newTVarIO Set.empty
|
jobHeldLocks <- liftIO $ newTVarIO Set.empty
|
||||||
|
registerJobHeldLocksCount jobHeldLocks
|
||||||
atomically $ putTMVar appJobState JobState
|
atomically $ putTMVar appJobState JobState
|
||||||
{ jobContext = JobContext{..}
|
{ jobContext = JobContext{..}
|
||||||
, ..
|
, ..
|
||||||
|
|||||||
@ -7,6 +7,7 @@ module Utils.Files
|
|||||||
) where
|
) where
|
||||||
|
|
||||||
import Import.NoFoundation
|
import Import.NoFoundation
|
||||||
|
import Utils.Metrics
|
||||||
import Foundation.Type
|
import Foundation.Type
|
||||||
import Handler.Utils.Minio
|
import Handler.Utils.Minio
|
||||||
import qualified Network.Minio as Minio
|
import qualified Network.Minio as Minio
|
||||||
@ -46,6 +47,8 @@ sinkFileDB doReplace fileContentContent = do
|
|||||||
fileChunkLockTime <- liftIO getCurrentTime
|
fileChunkLockTime <- liftIO getCurrentTime
|
||||||
fileChunkLockInstance <- getsYesod appInstanceID
|
fileChunkLockInstance <- getsYesod appInstanceID
|
||||||
|
|
||||||
|
observeSunkChunk StorageDB $ olength fileContentChunkContent
|
||||||
|
|
||||||
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
tellM $ Set.singleton <$> insert FileChunkLock{ fileChunkLockHash = fileContentChunkHash, .. }
|
||||||
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
|
existsChunk <- lift $ exists [FileContentChunkHash ==. fileContentChunkHash]
|
||||||
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
|
let setContentBased = updateWhere [FileContentChunkHash ==. fileContentChunkHash] [FileContentChunkContentBased =. fileContentChunkContentBased]
|
||||||
@ -98,8 +101,11 @@ sinkFileMinio fileContentContent = do
|
|||||||
case nextChunk of
|
case nextChunk of
|
||||||
Nothing
|
Nothing
|
||||||
-> putMVar chunk Nothing
|
-> putMVar chunk Nothing
|
||||||
Just nextChunk'
|
Just nextChunk' -> do
|
||||||
-> putMVar chunk (Just nextChunk') >> yield nextChunk' >> putChunks
|
observeSunkChunk StorageMinio $ olength nextChunk'
|
||||||
|
putMVar chunk $ Just nextChunk'
|
||||||
|
yield nextChunk'
|
||||||
|
putChunks
|
||||||
sinkAsync <- lift . allocateLinkedAsync . runConduit
|
sinkAsync <- lift . allocateLinkedAsync . runConduit
|
||||||
$ fileContentContent
|
$ fileContentContent
|
||||||
.| putChunks
|
.| putChunks
|
||||||
|
|||||||
@ -3,10 +3,12 @@ module Utils.Metrics
|
|||||||
, registerGHCMetrics
|
, registerGHCMetrics
|
||||||
, observeHTTPRequestLatency
|
, observeHTTPRequestLatency
|
||||||
, registerReadyMetric
|
, registerReadyMetric
|
||||||
|
, registerJobHeldLocksCount
|
||||||
, withJobWorkerStateLbls
|
, withJobWorkerStateLbls
|
||||||
, observeYesodCacheSize
|
, observeYesodCacheSize
|
||||||
, observeFavouritesQuickActionsDuration
|
, observeFavouritesQuickActionsDuration
|
||||||
, LoginOutcome(..), observeLoginOutcome
|
, LoginOutcome(..), observeLoginOutcome
|
||||||
|
, FileChunkStorage(..), observeSourcedChunk, observeSunkChunk
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Import.NoModel hiding (Vector, Info)
|
import Import.NoModel hiding (Vector, Info)
|
||||||
@ -25,6 +27,8 @@ import qualified Network.HTTP.Types as HTTP
|
|||||||
|
|
||||||
import Yesod.Core.Types (HandlerData(..), GHState(..))
|
import Yesod.Core.Types (HandlerData(..), GHState(..))
|
||||||
|
|
||||||
|
import qualified Data.Set as Set
|
||||||
|
|
||||||
{-# ANN module ("HLint: ignore Use even" :: String) #-}
|
{-# ANN module ("HLint: ignore Use even" :: String) #-}
|
||||||
|
|
||||||
|
|
||||||
@ -110,6 +114,31 @@ loginOutcomes = unsafeRegister . vector ("plugin", "outcome") $ counter info
|
|||||||
where info = Info "uni2work_login_attempts_total"
|
where info = Info "uni2work_login_attempts_total"
|
||||||
"Number of login attempts"
|
"Number of login attempts"
|
||||||
|
|
||||||
|
data JobHeldLocksCount = MkJobHeldLocksCount
|
||||||
|
|
||||||
|
jobHeldLocksCount :: TVar (Set QueuedJobId) -> Metric JobHeldLocksCount
|
||||||
|
jobHeldLocksCount heldLocks = Metric $ return (MkJobHeldLocksCount, collectJobHeldLocksCount)
|
||||||
|
where
|
||||||
|
collectJobHeldLocksCount = do
|
||||||
|
nLocks <- Set.size <$> readTVarIO heldLocks
|
||||||
|
let sample = encodeUtf8 $ tshow nLocks
|
||||||
|
return [SampleGroup info GaugeType [Sample "uni2work_jobs_held_locks_count" [] sample]]
|
||||||
|
info = Info "uni2work_jobs_held_locks_count"
|
||||||
|
"Number of job locks currently held by this Uni2work-instance"
|
||||||
|
|
||||||
|
{-# NOINLINE sourcedFileChunkSizes #-}
|
||||||
|
sourcedFileChunkSizes :: Vector Label1 Histogram
|
||||||
|
sourcedFileChunkSizes = unsafeRegister . vector ("storage") $ histogram info buckets
|
||||||
|
where info = Info "uni2work_sourced_file_chunks_bytes"
|
||||||
|
"Sizes of files chunks sourced"
|
||||||
|
buckets = 0 : histogramBuckets 1 20000000
|
||||||
|
|
||||||
|
{-# NOINLINE sunkFileChunkSizes #-}
|
||||||
|
sunkFileChunkSizes :: Vector Label1 Histogram
|
||||||
|
sunkFileChunkSizes = unsafeRegister . vector ("storage") $ histogram info buckets
|
||||||
|
where info = Info "uni2work_sunk_file_chunks_bytes"
|
||||||
|
"Sizes of files chunks sunk"
|
||||||
|
buckets = 0 : histogramBuckets 1 100000000
|
||||||
|
|
||||||
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
withHealthReportMetrics :: MonadIO m => m HealthReport -> m HealthReport
|
||||||
withHealthReportMetrics act = do
|
withHealthReportMetrics act = do
|
||||||
@ -196,3 +225,18 @@ observeLoginOutcome :: MonadHandler m
|
|||||||
-> m ()
|
-> m ()
|
||||||
observeLoginOutcome plugin outcome
|
observeLoginOutcome plugin outcome
|
||||||
= liftIO $ withLabel loginOutcomes (plugin, toPathPiece outcome) incCounter
|
= liftIO $ withLabel loginOutcomes (plugin, toPathPiece outcome) incCounter
|
||||||
|
|
||||||
|
registerJobHeldLocksCount :: MonadIO m => TVar (Set QueuedJobId) -> m ()
|
||||||
|
registerJobHeldLocksCount = liftIO . void . register . jobHeldLocksCount
|
||||||
|
|
||||||
|
data FileChunkStorage = StorageMinio | StorageDB
|
||||||
|
deriving (Eq, Ord, Read, Show, Enum, Bounded, Generic, Typeable)
|
||||||
|
deriving anyclass (Universe, Finite)
|
||||||
|
nullaryPathPiece ''FileChunkStorage $ camelToPathPiece' 1
|
||||||
|
|
||||||
|
observeSunkChunk, observeSourcedChunk :: (Integral n, MonadIO m) => FileChunkStorage -> n -> m ()
|
||||||
|
observeSunkChunk store = liftIO . observeChunkSize sunkFileChunkSizes store . fromIntegral
|
||||||
|
observeSourcedChunk store = liftIO . observeChunkSize sourcedFileChunkSizes store . fromIntegral
|
||||||
|
|
||||||
|
observeChunkSize :: Vector Label1 Histogram -> FileChunkStorage -> Integer -> IO ()
|
||||||
|
observeChunkSize metric (toPathPiece -> storageLabel) = withLabel metric storageLabel . flip observe . fromInteger
|
||||||
|
|||||||
Reference in New Issue
Block a user