fix(jobs): better flushing, correct metrics, better etas
This commit is contained in:
parent
105069a24d
commit
e4416e7f0e
@ -161,8 +161,8 @@ upload-cache:
|
||||
disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false"
|
||||
upload-cache-bucket: "uni2work-uploads"
|
||||
|
||||
inject-files: 307
|
||||
rechunk-files: 601
|
||||
inject-files: 601
|
||||
rechunk-files: 1201
|
||||
|
||||
file-upload-db-chunksize: 4194304 # 4MiB
|
||||
file-chunking-target-exponent: 21 # 2MiB
|
||||
|
||||
@ -454,10 +454,14 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker
|
||||
handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId)
|
||||
handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime)
|
||||
|
||||
handleCmd :: JobCtl -> ReaderT JobContext Handler ()
|
||||
handleCmd JobCtlTest = $logDebugS logIdent "JobCtlTest"
|
||||
handleCmd JobCtlFlush = do
|
||||
$logDebugS logIdent "JobCtlFlush..."
|
||||
void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
heldLocks <- asks jobHeldLocks >>= readTVarIO
|
||||
void . lift . runDB . runConduit
|
||||
$ selectKeys [ QueuedJobId /<-. Set.toList heldLocks ] [ Asc QueuedJobCreationTime ]
|
||||
.| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod)
|
||||
$logInfoS logIdent "JobCtlFlush"
|
||||
handleCmd (JobCtlQueue job) = do
|
||||
$logDebugS logIdent "JobCtlQueue..."
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
{-# OPTIONS_GHC -Wno-error=deprecations #-}
|
||||
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
module Jobs.Handler.Files
|
||||
@ -238,26 +240,33 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
logger <- askLoggerIO
|
||||
didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do
|
||||
objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions
|
||||
let sendChunks = go 0 0 Nothing . toNanoSecs =<< liftIO (getTime Monotonic)
|
||||
let sendChunks = go 0 0 Nothing =<< liftIO (getTime Monotonic)
|
||||
where
|
||||
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe Integer -> Integer -> ConduitT ByteString Void m ()
|
||||
go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString Void m ()
|
||||
go c accsz lastReport startT = do
|
||||
currT <- liftIO $ toNanoSecs <$> getTime Monotonic
|
||||
currT <- liftIO $ getTime Monotonic
|
||||
chunk' <- await
|
||||
whenIsJust chunk' $ \chunk -> do
|
||||
let csz = fromIntegral $ olength chunk
|
||||
!sz' = accsz + csz
|
||||
p :: Centi
|
||||
p = realToFrac $ (toInteger sz' % toInteger sz) * 100
|
||||
!c' = succ c
|
||||
eta :: Integer
|
||||
eta = ceiling $ ((toRational currT - toRational startT) / toRational accsz) * toRational (sz - fromIntegral accsz)
|
||||
!sz' = accsz + csz
|
||||
!lastReport'
|
||||
| currT - fromMaybe startT lastReport > 5e9 = Just currT
|
||||
| toRational currT - toRational (fromMaybe startT lastReport) > 5 = Just currT
|
||||
| otherwise = lastReport
|
||||
when (lastReport' /= lastReport) $
|
||||
runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%) ETA #{textDuration eta}...|]
|
||||
atomically . putTMVar chunkVar $ Just chunk
|
||||
when (csz > 0) $ do
|
||||
let p :: Centi
|
||||
p = realToFrac $ (toInteger sz' % toInteger sz) * 100
|
||||
eta :: Maybe Integer
|
||||
eta = do
|
||||
accsz' <- assertM' (/= 0) accsz
|
||||
return . ceiling $ (toRational currT - toRational startT) / (fromIntegral accsz') * (fromIntegral sz - fromIntegral accsz)
|
||||
when (lastReport' /= lastReport || sz' >= fromIntegral sz) $
|
||||
flip runLoggingT logger . $logInfoS "InjectFiles" . mconcat $ catMaybes
|
||||
[ pure [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|]
|
||||
, eta <&> \eta' -> [st| ETA #{textDuration eta'}|]
|
||||
, pure "..."
|
||||
]
|
||||
atomically . putTMVar chunkVar $ Just chunk
|
||||
go c' sz' lastReport' startT
|
||||
lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks
|
||||
return True
|
||||
@ -279,7 +288,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
|
||||
.| maybe (C.map id) (takeWhileTime . (/ 2)) interval
|
||||
.| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize)
|
||||
.| C.mapM (lift . injectOrDelete)
|
||||
.| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeRechunkedFiles inj sz)
|
||||
.| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeInjectedFiles inj sz)
|
||||
.| C.fold
|
||||
|
||||
$logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{textBytes injectedSize})|]
|
||||
|
||||
@ -78,7 +78,7 @@ httpRequestLatency :: Vector Label3 Histogram
|
||||
httpRequestLatency = unsafeRegister . vector ("handler", "method", "status") $ histogram info buckets
|
||||
where info = Info "http_request_duration_seconds"
|
||||
"HTTP request latency"
|
||||
buckets = histogramBuckets 50e-6 500
|
||||
buckets = histogramBuckets 50e-6 5000
|
||||
|
||||
data ReadySince = MkReadySince
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user