From f85a3b8272d6bbf1e02b7ea35356735820efb0ff Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Sat, 30 Jul 2022 17:38:35 +0200 Subject: [PATCH] fix bug in termination procedure --- .../reactor-cpp/grouped_scheduling_policy.hh | 1 + lib/grouped_scheduling_policy.cc | 41 +++++++++++-------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/include/reactor-cpp/grouped_scheduling_policy.hh b/include/reactor-cpp/grouped_scheduling_policy.hh index 13ccf214..71b57494 100644 --- a/include/reactor-cpp/grouped_scheduling_policy.hh +++ b/include/reactor-cpp/grouped_scheduling_policy.hh @@ -61,6 +61,7 @@ private: void schedule(); auto finalize_group_and_notify_successors(ReactionGroup* group, std::vector& out_ready_groups) -> bool; void notify_groups(const std::vector& groups, std::vector& out_ready_groups); + void terminate_workers(); public: GroupedSchedulingPolicy(Scheduler& scheduler, Environment& env); diff --git a/lib/grouped_scheduling_policy.cc b/lib/grouped_scheduling_policy.cc index 3c2f9178..bb173794 100644 --- a/lib/grouped_scheduling_policy.cc +++ b/lib/grouped_scheduling_policy.cc @@ -127,21 +127,13 @@ auto GroupedSchedulingPolicy::create_worker() -> Worker void GroupedSchedulingPolicy::schedule() { group_queue_.reset(); - if (continue_execution_.load(std::memory_order_acquire)) { - log::Debug() << "(Worker " << Worker::current_worker_id() << ") call next"; - bool continue_execution = scheduler_.next(); - std::atomic_thread_fence(std::memory_order_release); - if (!continue_execution) { - continue_execution_.store(false, std::memory_order_relaxed); - } - groups_to_process_.store(num_groups_, std::memory_order_relaxed); - } else { - log::Debug() << "(Worker " << Worker::current_worker_id() - << ") signal all workers to terminate"; - std::vector null_groups_(environment_.num_workers() + 1, nullptr); - group_queue_.push(null_groups_); - groups_to_process_.store(environment_.num_workers(), std::memory_order_release); + log::Debug() << "(Worker " << Worker::current_worker_id() << ") call next"; + bool continue_execution = scheduler_.next(); + std::atomic_thread_fence(std::memory_order_release); + if (!continue_execution) { + continue_execution_.store(false, std::memory_order_relaxed); } + groups_to_process_.store(num_groups_, std::memory_order_relaxed); } auto GroupedSchedulingPolicy::finalize_group_and_notify_successors(ReactionGroup* group, @@ -173,6 +165,14 @@ void GroupedSchedulingPolicy::notify_groups(const std::vector& g std::atomic_thread_fence(std::memory_order_acquire); } +void GroupedSchedulingPolicy::terminate_workers() { + log::Debug() << "(Worker " << Worker::current_worker_id() + << ") signal all workers to terminate"; + std::vector null_groups_(environment_.num_workers() + 1, nullptr); + group_queue_.push(null_groups_); + groups_to_process_.store(environment_.num_workers(), std::memory_order_release); +} + void GroupedSchedulingPolicy::worker_function(const Worker& worker) { // This is used as a list for storing new ready groups found while processing a group. std::vector ready_groups; @@ -206,8 +206,17 @@ void GroupedSchedulingPolicy::worker_function(const Worker