From 8356d8d2beebe02244e718f0aabc593b39d359f0 Mon Sep 17 00:00:00 2001 From: Chen Chen Date: Wed, 14 Aug 2024 14:59:55 -0500 Subject: [PATCH] bench: fix async backoff --- src/bench.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/bench.rs b/src/bench.rs index f3df46a..fd53c1d 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -645,6 +645,22 @@ impl RateLimiter { Self { ops } } + /// Returns whether the backoff is done. + #[inline(always)] + fn try_backoff(&self, count: u64, start: Instant) -> bool { + if self.ops == 0 { + return true; + } + // self.kops is the target rate in kops, which is op/ms + let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap(); + let ops = count * 1_000_000_000 / elapsed; + if ops <= self.ops { + return true; + } + false + } + + /// Blocking backoff. #[inline(always)] fn backoff(&self, count: u64, start: Instant) { if self.ops == 0 { @@ -838,13 +854,11 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { } } - // try limit rate after a batch is sent - rate_limiter.backoff(*counter, start); - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { workload.reset(); break; } + // use a loop to make sure that pending is under qd, only drain the handle if the bench // phase is not ending loop { @@ -857,7 +871,8 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { l.async_record(r.id, submit); } } - if pending <= benchmark.qd { + // if the pending queue is under depth, and backoff is done + if pending <= benchmark.qd && rate_limiter.try_backoff(*counter, start) { break; } }