Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept some differing units for throttling #1103

Merged
merged 2 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions esrally/driver/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class Unthrottled(Scheduler):
def next(self, current):
return 0

def __str__(self):
return "unthrottled"


class DeterministicScheduler(SimpleScheduler):
"""
Expand Down Expand Up @@ -264,9 +267,19 @@ def after_request(self, now, weight, unit, request_meta_data):
expected_unit = self.task.target_throughput.unit
actual_unit = f"{unit}/s"
if actual_unit != expected_unit:
raise exceptions.RallyAssertionError(f"Target throughput for [{self.task}] is specified in "
f"[{expected_unit}] but the task throughput is measured "
f"in [{actual_unit}].")
# *temporary* workaround to convert mismatching units to ops/s to stay backwards-compatible.
#
# This ensures that we throttle based on ops/s but report based on the original unit (as before).
if expected_unit == "ops/s":
weight = 1
if self.first_request:
logging.getLogger(__name__).warning("Task [%s] throttles based on [%s] but reports [%s]. "
"Please specify the target throughput in [%s] instead.",
self.task, expected_unit, actual_unit, actual_unit)
else:
raise exceptions.RallyAssertionError(f"Target throughput for [{self.task}] is specified in "
f"[{expected_unit}] but the task throughput is measured "
f"in [{actual_unit}].")

self.first_request = False
self.current_weight = weight
Expand Down
9 changes: 7 additions & 2 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import os
import re
import sys
import tempfile
import urllib.error

Expand Down Expand Up @@ -794,8 +795,12 @@ def post_process_for_test_mode(t):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Resetting measurement time period for [%s] to [%d] seconds.", str(leaf_task), leaf_task.time_period)

leaf_task.params.pop("target-throughput", None)
leaf_task.params.pop("target-interval", None)
# Keep throttled to expose any errors but increase the target throughput for short execution times.
if leaf_task.throttled:
original_throughput = leaf_task.target_throughput
leaf_task.params.pop("target-throughput", None)
leaf_task.params.pop("target-interval", None)
leaf_task.params["target-throughput"] = f"{sys.maxsize} {original_throughput.unit}"

return t

Expand Down
19 changes: 19 additions & 0 deletions tests/driver/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ def test_scheduler_adapts_to_changed_weights(self):
s.after_request(now=None, weight=10000, unit="docs", request_meta_data=None)
self.assertEqual(2 * task.clients, s.next(0))

def test_scheduler_accepts_differing_units_pages_and_ops(self):
task = track.Task(name="scroll-query",
operation=track.Operation(
name="scroll-query",
operation_type=track.OperationType.Search.name),
clients=1,
params={
# implicitly: ops/s
"target-throughput": 10
})

s = scheduler.UnitAwareScheduler(task=task, scheduler_class=scheduler.DeterministicScheduler)
# first request is unthrottled
self.assertEqual(0, s.next(0))
# no exception despite differing units ...
s.after_request(now=None, weight=20, unit="pages", request_meta_data=None)
# ... and it is still throttled in ops/s
self.assertEqual(0.1 * task.clients, s.next(0))


class SchedulerCategorizationTests(TestCase):
class LegacyScheduler:
Expand Down