From 6da0660cbca09706d232029a0a709692d50aa953 Mon Sep 17 00:00:00 2001 From: Chen Chen Date: Wed, 14 Aug 2024 15:08:21 -0500 Subject: [PATCH] bench: fix async backoff --- src/bench.rs | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/src/bench.rs b/src/bench.rs index f3df46a..dbac8a6 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -645,6 +645,22 @@ impl RateLimiter { Self { ops } } + /// Returns whether the backoff is done. + #[inline(always)] + fn try_backoff(&self, count: u64, start: Instant) -> bool { + if self.ops == 0 { + return true; + } + // self.kops is the target rate in kops, which is op/ms + let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap(); + let ops = count * 1_000_000_000 / elapsed; + if ops <= self.ops { + return true; + } + false + } + + /// Blocking backoff. #[inline(always)] fn backoff(&self, count: u64, start: Instant) { if self.ops == 0 { @@ -838,26 +854,28 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { } } - // try limit rate after a batch is sent - rate_limiter.backoff(*counter, start); - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { workload.reset(); break; } + // use a loop to make sure that pending is under qd, only drain the handle if the bench // phase is not ending loop { - handle.drain(); - let responses = responder.replace_with(|_| Vec::new()); - pending -= responses.len(); - if let Some(ref mut l) = latency { - let submit = Instant::now(); - for r in responses.iter() { - l.async_record(r.id, submit); + // do not drain if the pending queue is empty + if pending > 0 { + handle.drain(); + let responses = responder.replace_with(|_| Vec::new()); + pending -= responses.len(); + if let Some(ref mut l) = latency { + let submit = Instant::now(); + for r in responses.iter() { + l.async_record(r.id, submit); + } } } - if pending <= benchmark.qd { + // if the pending queue is under depth (can be 0), and backoff is done + if pending <= benchmark.qd && rate_limiter.try_backoff(*counter, start) { break; } }