diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index 17862fec9..69f969538 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -251,12 +251,12 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do p = realToFrac $ (toInteger sz' % toInteger sz) * 100 !c' = succ c eta :: Integer - eta = ceiling $ ((currT - startT) % fromIntegral accsz) * fromIntegral (sz - fromIntegral accsz) + eta = ceiling $ (toRational (currT - startT) / fromIntegral accsz) * fromIntegral (sz - fromIntegral accsz) !lastReport' | currT - fromMaybe startT lastReport > 5e9 = Just currT | otherwise = lastReport when (lastReport' /= lastReport) $ - runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{tshow sz'}/#{tshow sz} (#{tshow p}%) ETA #{tshow eta}s...|] + runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{tshow csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%) ETA #{textDuration eta}...|] atomically . putTMVar chunkVar $ Just chunk go c' sz' lastReport' startT lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks diff --git a/src/Jobs/Queue.hs b/src/Jobs/Queue.hs index 491ce98b7..0a495194b 100644 --- a/src/Jobs/Queue.hs +++ b/src/Jobs/Queue.hs @@ -70,21 +70,23 @@ writeJobCtl cmd = do writeJobCtl' target cmd -writeJobCtlBlock' :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m () +writeJobCtlBlock' :: (MonadMask m, MonadIO m, MonadReader UniWorX m) => (JobCtl -> m ()) -> JobCtl -> m () -- | Pass an instruction to a `Job`-Worker using the provided callback and block until it was acted upon writeJobCtlBlock' writeCtl cmd = do getResVar <- fmap (jobConfirm . jobContext) $ asks appJobState >>= atomically . readTMVar - resVar <- atomically $ do - var <- newEmptyTMVar - modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var) - return var - writeCtl cmd - let - removeResVar = HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd - mExc <- atomically $ takeTMVar resVar <* modifyTVar' getResVar removeResVar - maybe (return ()) throwM mExc -writeJobCtlBlock :: (MonadThrow m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () + let getResVar' = atomically $ do + var <- newEmptyTMVar + modifyTVar' getResVar $ HashMap.insertWith (<>) cmd (pure var) + return var + removeResVar resVar = modifyTVar' getResVar $ HashMap.update (NonEmpty.nonEmpty . NonEmpty.filter (/= resVar)) cmd + + bracket getResVar' (atomically . removeResVar) $ \resVar -> do + writeCtl cmd + mExc <- atomically $ takeTMVar resVar <* removeResVar resVar + maybe (return ()) throwM mExc + +writeJobCtlBlock :: (MonadMask m, MonadIO m, MonadReader UniWorX m) => JobCtl -> m () -- | Pass an instruction to the `Job`-Workers and block until it was acted upon writeJobCtlBlock = writeJobCtlBlock' writeJobCtl diff --git a/src/Utils.hs b/src/Utils.hs index 654bd22c9..2aa71f0db 100644 --- a/src/Utils.hs +++ b/src/Utils.hs @@ -298,6 +298,19 @@ textBytes x rshow :: Double -> Text rshow = tshow . floorToDigits 1 +textDuration :: forall a. Integral a => a -> Text +textDuration n' = view _2 $ foldr acc (toInteger n', "") units + where units = sortOn (view _1) + [ (86400, "d") + , (3600, "h") + , (60, "m") + , (1, "s") + ] + acc (mult, unit) (n, t) + | unitCount > 0 = (unitRem, t <> tshow unitCount <> tshow unit) + | otherwise = (n, t) + where (unitCount, unitRem) = n `divMod` mult + stepTextCounterCI :: CI Text -> CI Text -- find and increment rightmost-number, preserving leading zeroes stepTextCounterCI = CI.map stepTextCounter