fix(jobs): improve job worker healthchecks & logging

This commit is contained in:
Gregor Kleen 2020-09-22 02:39:03 +02:00
parent ffed57623f
commit 2a84edccb4
3 changed files with 28 additions and 13 deletions

View File

@ -251,12 +251,12 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do
p = realToFrac $ (toInteger sz' % toInteger sz) * 100
!c' = succ c
eta :: Integer
eta = ceiling $ ((currT - startT) % fromIntegral accsz) * fromIntegral (sz - fromIntegral accsz)
eta = ceiling $ (toRational (currT - startT) / fromIntegral accsz) * fromIntegral (sz - fromIntegral accsz)
!lastReport'
| currT - fromMaybe startT lastReport > 5e9 = Just currT
| otherwise = lastReport
when (lastReport' /= lastReport) $
runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{tshow sz'}/#{tshow sz} (#{tshow p}%) ETA #{tshow eta}s...|]
runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%) ETA #{textDuration eta}...|]
atomically . putTMVar chunkVar $ Just chunk
go c' sz' lastReport' startT
lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks

View File

@ -70,21 +70,23 @@ writeJobCtl cmd = do
writeJobCtl' target cmd
writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m ()
writeJobCtlBlock' :: (MonadMask m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m ()
-- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon
writeJobCtlBlock' writeCtl cmd = do
getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar
resVar <- atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
writeCtl cmd
let
removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar
maybe (return ()) throwM mExc
writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
let getResVar' = atomically $ do
var <- newEmptyTMVar
modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var)
return var
removeResVar resVar = modifyTVar' getResVar $ HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd
bracket getResVar' (atomically . removeResVar) $ \resVar -> do
writeCtl cmd
mExc <- atomically $ takeTMVar resVar <* removeResVar resVar
maybe (return ()) throwM mExc
writeJobCtlBlock :: (MonadMask m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m ()
-- | Pass an instruction to the `Job`-Workers and block until it was acted upon
writeJobCtlBlock = writeJobCtlBlock' writeJobCtl

View File

@ -298,6 +298,19 @@ textBytes x
rshow :: Double -> Text
rshow = tshow . floorToDigits 1
textDuration :: forall a. Integral a => a -> Text
textDuration n' = view _2 $ foldr acc (toInteger n', "") units
where units = sortOn (view _1)
[ (86400, "d")
, (3600, "h")
, (60, "m")
, (1, "s")
]
acc (mult, unit) (n, t)
| unitCount > 0 = (unitRem, t <> tshow unitCount <> tshow unit)
| otherwise = (n, t)
where (unitCount, unitRem) = n `divMod` mult
stepTextCounterCI :: CI Text -> CI Text -- find and increment rightmost-number, preserving leading zeroes
stepTextCounterCI = CI.map stepTextCounter