diff --git a/stestr/commands/run.py b/stestr/commands/run.py index 8c02b24..a1ca9f5 100644 --- a/stestr/commands/run.py +++ b/stestr/commands/run.py @@ -18,6 +18,7 @@ import os import subprocess import sys +import warnings from cliff import command import subunit @@ -244,6 +245,12 @@ def get_parser(self, prog_name): help="If set, show non-text attachments. This is " "generally only useful for debug purposes.", ) + parser.add_argument( + "--dynamic", + action="store_true", + default=False, + help="Enable the EXPERIMENTAL dynamic scheduler", + ) return parser def take_action(self, parsed_args): @@ -335,6 +342,7 @@ def take_action(self, parsed_args): all_attachments=all_attachments, show_binary_attachments=args.show_binary_attachments, pdb=args.pdb, + dynamic=args.dynamic, ) # Always output slowest test info if requested, regardless of other @@ -396,6 +404,7 @@ def run_command( all_attachments=False, show_binary_attachments=True, pdb=False, + dynamic=False, ): """Function to execute the run command @@ -460,6 +469,7 @@ def run_command( :param str pdb: Takes in a single test_id to bypasses test discover and just execute the test specified without launching any additional processes. A file name may be used in place of a test name. + :param bool dynamic: Enable dynamic scheduling :return return_code: The exit code for the command. 0 for success and > 0 for failures. @@ -501,6 +511,11 @@ def run_command( ) stdout.write(msg) return 2 + if dynamic: + warnings.warn( + "WARNING: The dynamic scheduler is still experimental. " + "You might encounter issues while using it" + ) if combine: latest_id = repo.latest_id() combine_id = str(latest_id) @@ -542,7 +557,8 @@ def run_tests(): ( "subunit", output.ReturnCodeToSubunit( - subprocess.Popen(run_cmd, shell=True, stdout=subprocess.PIPE) + subprocess.Popen(run_cmd, shell=True, stdout=subprocess.PIPE), + dynamic=False, ), ) ] @@ -645,6 +661,7 @@ def run_tests(): top_dir=top_dir, test_path=test_path, randomize=random, + dynamic=dynamic, ) if isolated: result = 0 @@ -684,6 +701,7 @@ def run_tests(): suppress_attachments=suppress_attachments, all_attachments=all_attachments, show_binary_attachments=show_binary_attachments, + dynamic=dynamic, ) if run_result > result: result = run_result @@ -702,6 +720,7 @@ def run_tests(): suppress_attachments=suppress_attachments, all_attachments=all_attachments, show_binary_attachments=show_binary_attachments, + dynamic=dynamic, ) else: # Where do we source data about the cause of conflicts. @@ -771,16 +790,28 @@ def _run_tests( suppress_attachments=False, all_attachments=False, show_binary_attachments=False, + dynamic=False, ): """Run the tests cmd was parameterised with.""" cmd.setUp() try: def run_tests(): - run_procs = [ - ("subunit", output.ReturnCodeToSubunit(proc)) - for proc in cmd.run_tests() - ] + if not dynamic or cmd.concurrency == 1: + run_procs = [ + ("subunit", output.ReturnCodeToSubunit(proc, dynamic=False)) + for proc in cmd.run_tests() + ] + else: + run_procs = [ + ( + "subunit", + output.ReturnCodeToSubunit( + os.fdopen(proc["stream"]), proc["proc"] + ), + ) + for proc in cmd.run_tests() + ] if not run_procs: stdout.write("The specified regex doesn't match with anything") return 1 diff --git a/stestr/config_file.py b/stestr/config_file.py index 6a66503..215be27 100644 --- a/stestr/config_file.py +++ b/stestr/config_file.py @@ -113,6 +113,7 @@ def get_run_command( exclude_regex=None, randomize=False, parallel_class=None, + dynamic=False, ): """Get a test_processor.TestProcessorFixture for this config file @@ -158,6 +159,7 @@ def get_run_command( stestr scheduler by class. If both this and the corresponding config file option which includes `group-regex` are set, this value will be used. + :param bool dynamic: Enable dynamic scheduling :returns: a TestProcessorFixture object for the specified config file and any arguments passed into this function @@ -236,4 +238,5 @@ def group_callback(test_id, regex=re.compile(group_regex)): exclude_regex=exclude_regex, include_list=include_list, randomize=randomize, + dynamic=dynamic, ) diff --git a/stestr/output.py b/stestr/output.py index 9af043e..b194859 100644 --- a/stestr/output.py +++ b/stestr/output.py @@ -164,21 +164,33 @@ class ReturnCodeToSubunit: generating subunit. """ - def __init__(self, process): + def __init__(self, process, thread=None, dynamic=True): """Adapt a process to a readable stream.""" self.proc = process self.done = False - self.source = self.proc.stdout + if dynamic: + self.source = process + self.proc = thread + else: + self.source = self.proc.stdout + self.dynamic = dynamic self.lastoutput = bytes((b"\n")[0]) def __del__(self): - self.proc.wait() + if hasattr(self.proc, "wait"): + self.proc.wait() + else: + self.proc.join() def _append_return_code_as_test(self): if self.done is True: return self.source = io.BytesIO() - returncode = self.proc.wait() + if not self.dynamic: + returncode = self.proc.wait() + else: + self.proc.join() + returncode = self.proc.exitcode if returncode != 0: if self.lastoutput != bytes((b"\n")[0]): # Subunit V1 is line orientated, it has to start on a fresh diff --git a/stestr/scheduler.py b/stestr/scheduler.py index e5816c1..bb7a2ac 100644 --- a/stestr/scheduler.py +++ b/stestr/scheduler.py @@ -21,6 +21,74 @@ from stestr import selection +def get_dynamic_test_list( + test_ids, repository=None, group_callback=None, randomize=False +): + dynamic_test_list = [] + _group_callback = group_callback + time_data = {} + if randomize: + return random.shuffle(test_ids) + if repository: + time_data = repository.get_test_times(test_ids) + timed_tests = time_data["known"] + unknown_tests = time_data["unknown"] + else: + timed_tests = {} + unknown_tests = set(test_ids) + # Group tests: generate group_id -> test_ids. + group_ids = collections.defaultdict(list) + if _group_callback is None: + + def group_callback(_): + return None + + else: + group_callback = _group_callback + for test_id in test_ids: + group_id = group_callback(test_id) or test_id + group_ids[group_id].append(test_id) + # Time groups: generate three sets of groups: + # - fully timed dict(group_id -> time), + # - partially timed dict(group_id -> time) and + # - unknown (set of group_id) + # We may in future treat partially timed different for scheduling, but + # at least today we just schedule them after the fully timed groups. + timed = {} + partial = {} + unknown = [] + for group_id, group_tests in group_ids.items(): + untimed_ids = unknown_tests.intersection(group_tests) + group_time = sum( + [ + timed_tests[test_id] + for test_id in untimed_ids.symmetric_difference(group_tests) + ] + ) + if not untimed_ids: + timed[group_id] = group_time + elif group_time: + partial[group_id] = group_time + else: + unknown.append(group_id) + + # Scheduling is NP complete in general, so we avoid aiming for + # perfection. A quick approximation that is sufficient for our general + # needs: + # sort the groups by time + # allocate to partitions by putting each group in to the partition with + # the current (lowest time, shortest length[in tests]) + def consume_queue(groups): + queue = sorted(groups.items(), key=operator.itemgetter(1), reverse=True) + dynamic_test_list.extend([group[0] for group in queue]) + + consume_queue(timed) + consume_queue(partial) + dynamic_test_list.extend(unknown) + + return dynamic_test_list + + def partition_tests(test_ids, concurrency, repository, group_callback, randomize=False): """Partition test_ids by concurrency. diff --git a/stestr/test_processor.py b/stestr/test_processor.py index f52596a..dd2ce8e 100644 --- a/stestr/test_processor.py +++ b/stestr/test_processor.py @@ -10,7 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import io +import multiprocessing import os import re import signal @@ -24,6 +26,8 @@ from stestr import results from stestr import scheduler from stestr import selection +from stestr.subunit_runner import program +from stestr.subunit_runner import run from stestr import testlist @@ -94,6 +98,7 @@ def __init__( exclude_regex=None, include_list=None, randomize=False, + dynamic=False, ): """Create a TestProcessorFixture.""" @@ -115,6 +120,7 @@ def __init__( self.include_list = include_list self.exclude_regex = exclude_regex self.randomize = randomize + self.dynamic = dynamic def setUp(self): super().setUp() @@ -249,6 +255,31 @@ def list_tests(self): ids = testlist.parse_enumeration(out) return ids + def _dynamic_run_tests(self, job_queue, subunit_pipe): + while True: + # NOTE(mtreinish): Open on each loop iteration with a dup to + # remove the chance of being garbage collected. Without this + # you'll be fighting random Bad file desciptor errors + subunit_pipe = os.fdopen(os.dup(subunit_pipe.fileno()), "wb") + if job_queue.empty(): + subunit_pipe.close() + return + try: + test_id = job_queue.get(block=False) + except Exception: + subunit_pipe.close() + return + if not test_id: + os.close(subunit_pipe.fileno()) + raise ValueError("Invalid blank test_id: %s" % test_id) + cmd_list = [self.cmd, test_id] + test_runner = run.SubunitTestRunner + program.TestProgram( + module=None, + argv=cmd_list, + testRunner=functools.partial(test_runner, stdout=subunit_pipe), + ) + def run_tests(self): """Run the tests defined by the command @@ -280,19 +311,41 @@ def run_tests(self): test_id_groups = scheduler.partition_tests( test_ids, self.concurrency, self.repository, self._group_callback ) - for test_ids in test_id_groups: - if not test_ids: - # No tests in this partition - continue - fixture = self.useFixture( - TestProcessorFixture( - test_ids, - self.template, - self.listopt, - self.idoption, - self.repository, - parallel=False, + if not self.dynamic: + for test_ids in test_id_groups: + if not test_ids: + # No tests in this partition + continue + fixture = self.useFixture( + TestProcessorFixture( + test_ids, + self.template, + self.listopt, + self.idoption, + self.repository, + parallel=False, + ) ) + result.extend(fixture.run_tests()) + return result + else: + test_id_list = scheduler.get_dynamic_test_list( + test_ids, self.repository, self._group_callback ) - result.extend(fixture.run_tests()) - return result + test_list = multiprocessing.Queue() + + for test_id in test_id_list: + test_list.put(test_id) + + for i in range(self.concurrency): + fd_pipe_r, fd_pipe_w = multiprocessing.Pipe(False) + name = "worker-%s" % i + proc = multiprocessing.Process( + target=self._dynamic_run_tests, + name=name, + args=(test_list, fd_pipe_w), + ) + proc.start() + stream_read = os.dup(fd_pipe_r.fileno()) + result.append({"stream": stream_read, "proc": proc}) + return result diff --git a/stestr/tests/test_config_file.py b/stestr/tests/test_config_file.py index a1a1e9d..73f522f 100644 --- a/stestr/tests/test_config_file.py +++ b/stestr/tests/test_config_file.py @@ -74,6 +74,7 @@ def _check_get_run_command( exclude_regex=None, exclude_list=None, concurrency=0, + dynamic=False, group_callback=expected_group_callback, test_filters=None, randomize=False, diff --git a/stestr/tests/test_return_codes.py b/stestr/tests/test_return_codes.py index 99c1e3c..f102607 100644 --- a/stestr/tests/test_return_codes.py +++ b/stestr/tests/test_return_codes.py @@ -94,7 +94,6 @@ def assertRunExit(self, cmd, expected, subunit=False, stdin=None): "%s" % cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out, err = p.communicate() - if not subunit: self.assertEqual( p.returncode, expected, "Stdout: {}; Stderr: {}".format(out, err) diff --git a/tox.ini b/tox.ini index 4730fba..8d76811 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ deps = -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt commands = python tools/find_and_rm.py - stestr run {posargs} + stestr run --dynamic {posargs} [testenv:pep8] sitepackages = False