diff --git a/config/settings.yml b/config/settings.yml index 252507577..9524f75b2 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -161,8 +161,8 @@ upload-cache: disable-cert-validation: "_env:UPLOAD_S3_DISABLE_CERT_VALIDATION:false" upload-cache-bucket: "uni2work-uploads" -inject-files: 307 -rechunk-files: 601 +inject-files: 601 +rechunk-files: 1201 file-upload-db-chunksize: 4194304 # 4MiB file-chunking-target-exponent: 21 # 2MiB diff --git a/src/Jobs.hs b/src/Jobs.hs index d461b539b..37428a84c 100644 --- a/src/Jobs.hs +++ b/src/Jobs.hs @@ -454,10 +454,14 @@ handleJobs' wNum = C.mapM_ $ \jctl -> hoist delimitInternalState . withJobWorker handleQueueException (JNonexistant jId) = $logInfoS logIdent $ "Saw nonexistant queue id: " ++ tshow (fromSqlKey jId) handleQueueException (JLocked jId lInstance lTime) = $logDebugS logIdent $ "Saw locked QueuedJob: " ++ tshow (fromSqlKey jId, lInstance, lTime) + handleCmd :: JobCtl -> ReaderT JobContext Handler () handleCmd JobCtlTest = $logDebugS logIdent "JobCtlTest" handleCmd JobCtlFlush = do $logDebugS logIdent "JobCtlFlush..." - void . lift . runDB . runConduit $ selectKeys [] [ Asc QueuedJobCreationTime ] .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) + heldLocks <- asks jobHeldLocks >>= readTVarIO + void . lift . runDB . runConduit + $ selectKeys [ QueuedJobId /<-. Set.toList heldLocks ] [ Asc QueuedJobCreationTime ] + .| C.mapM_ (\j -> lift $ runReaderT (writeJobCtl $ JobCtlPerform j) =<< getYesod) $logInfoS logIdent "JobCtlFlush" handleCmd (JobCtlQueue job) = do $logDebugS logIdent "JobCtlQueue..." diff --git a/src/Jobs/Handler/Files.hs b/src/Jobs/Handler/Files.hs index fa1eec303..2648edd05 100644 --- a/src/Jobs/Handler/Files.hs +++ b/src/Jobs/Handler/Files.hs @@ -1,3 +1,5 @@ +{-# OPTIONS_GHC -Wno-error=deprecations #-} + {-# LANGUAGE BangPatterns #-} module Jobs.Handler.Files @@ -238,26 +240,33 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do logger <- askLoggerIO didSend <- maybeT (return False) . hoistMaybeM . runAppMinio . runMaybeT $ do objRes <- catchIfMaybeT minioIsDoesNotExist $ Minio.getObject uploadBucket obj Minio.defaultGetObjectOptions - let sendChunks = go 0 0 Nothing . toNanoSecs =<< liftIO (getTime Monotonic) + let sendChunks = go 0 0 Nothing =<< liftIO (getTime Monotonic) where - go :: forall m. MonadIO m => Natural -> Int64 -> Maybe Integer -> Integer -> ConduitT ByteString Void m () + go :: forall m. MonadIO m => Natural -> Int64 -> Maybe TimeSpec -> TimeSpec -> ConduitT ByteString Void m () go c accsz lastReport startT = do - currT <- liftIO $ toNanoSecs <$> getTime Monotonic + currT <- liftIO $ getTime Monotonic chunk' <- await whenIsJust chunk' $ \chunk -> do let csz = fromIntegral $ olength chunk - !sz' = accsz + csz - p :: Centi - p = realToFrac $ (toInteger sz' % toInteger sz) * 100 !c' = succ c - eta :: Integer - eta = ceiling $ ((toRational currT - toRational startT) / toRational accsz) * toRational (sz - fromIntegral accsz) + !sz' = accsz + csz !lastReport' - | currT - fromMaybe startT lastReport > 5e9 = Just currT + | toRational currT - toRational (fromMaybe startT lastReport) > 5 = Just currT | otherwise = lastReport - when (lastReport' /= lastReport) $ - runLoggingT ?? logger $ $logInfoS "InjectFiles" [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%) ETA #{textDuration eta}...|] - atomically . putTMVar chunkVar $ Just chunk + when (csz > 0) $ do + let p :: Centi + p = realToFrac $ (toInteger sz' % toInteger sz) * 100 + eta :: Maybe Integer + eta = do + accsz' <- assertM' (/= 0) accsz + return . ceiling $ (toRational currT - toRational startT) / (fromIntegral accsz') * (fromIntegral sz - fromIntegral accsz) + when (lastReport' /= lastReport || sz' >= fromIntegral sz) $ + flip runLoggingT logger . $logInfoS "InjectFiles" . mconcat $ catMaybes + [ pure [st|Sinking chunk ##{tshow c} (#{textBytes csz}): #{textBytes sz'}/#{textBytes sz} (#{tshow p}%)|] + , eta <&> \eta' -> [st| ETA #{textDuration eta'}|] + , pure "..." + ] + atomically . putTMVar chunkVar $ Just chunk go c' sz' lastReport' startT lift . runConduit $ Minio.gorObjectStream objRes .| sendChunks return True @@ -279,7 +288,7 @@ dispatchJobInjectFiles = JobHandlerException . maybeT (return ()) $ do .| maybe (C.map id) (takeWhileTime . (/ 2)) interval .| transPipe (lift . runDB . setSerializable) (persistentTokenBucketTakeC' TokenBucketInjectFiles $ views _1 Minio.oiSize) .| C.mapM (lift . injectOrDelete) - .| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeRechunkedFiles inj sz) + .| C.mapM (\res@(Sum inj, Sum sz) -> res <$ observeInjectedFiles inj sz) .| C.fold $logInfoS "InjectFiles" [st|Injected #{tshow injectedFiles} files from upload cache into database (#{textBytes injectedSize})|] diff --git a/src/Utils/Metrics.hs b/src/Utils/Metrics.hs index 16c6f994c..f9cfc0b2e 100644 --- a/src/Utils/Metrics.hs +++ b/src/Utils/Metrics.hs @@ -78,7 +78,7 @@ httpRequestLatency :: Vector Label3 Histogram httpRequestLatency = unsafeRegister . vector ("handler", "method", "status") $ histogram info buckets where info = Info "http_request_duration_seconds" "HTTP request latency" - buckets = histogramBuckets 50e-6 500 + buckets = histogramBuckets 50e-6 5000 data ReadySince = MkReadySince