Skip to content

Commit

Permalink
fix bug in termination procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
cmnrd committed Jul 30, 2022
1 parent 27abf26 commit f85a3b8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
1 change: 1 addition & 0 deletions include/reactor-cpp/grouped_scheduling_policy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private:
void schedule();
auto finalize_group_and_notify_successors(ReactionGroup* group, std::vector<ReactionGroup*>& out_ready_groups) -> bool;
void notify_groups(const std::vector<ReactionGroup*>& groups, std::vector<ReactionGroup*>& out_ready_groups);
void terminate_workers();

public:
GroupedSchedulingPolicy(Scheduler<GroupedSchedulingPolicy>& scheduler, Environment& env);
Expand Down
41 changes: 25 additions & 16 deletions lib/grouped_scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,13 @@ auto GroupedSchedulingPolicy::create_worker() -> Worker<GroupedSchedulingPolicy>

void GroupedSchedulingPolicy::schedule() {
group_queue_.reset();
if (continue_execution_.load(std::memory_order_acquire)) {
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id() << ") call next";
bool continue_execution = scheduler_.next();
std::atomic_thread_fence(std::memory_order_release);
if (!continue_execution) {
continue_execution_.store(false, std::memory_order_relaxed);
}
groups_to_process_.store(num_groups_, std::memory_order_relaxed);
} else {
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id()
<< ") signal all workers to terminate";
std::vector<ReactionGroup*> null_groups_(environment_.num_workers() + 1, nullptr);
group_queue_.push(null_groups_);
groups_to_process_.store(environment_.num_workers(), std::memory_order_release);
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id() << ") call next";
bool continue_execution = scheduler_.next();
std::atomic_thread_fence(std::memory_order_release);
if (!continue_execution) {
continue_execution_.store(false, std::memory_order_relaxed);
}
groups_to_process_.store(num_groups_, std::memory_order_relaxed);
}

auto GroupedSchedulingPolicy::finalize_group_and_notify_successors(ReactionGroup* group,
Expand Down Expand Up @@ -173,6 +165,14 @@ void GroupedSchedulingPolicy::notify_groups(const std::vector<ReactionGroup*>& g
std::atomic_thread_fence(std::memory_order_acquire);
}

void GroupedSchedulingPolicy::terminate_workers() {
log::Debug() << "(Worker " << Worker<GroupedSchedulingPolicy>::current_worker_id()
<< ") signal all workers to terminate";
std::vector<ReactionGroup*> null_groups_(environment_.num_workers() + 1, nullptr);
group_queue_.push(null_groups_);
groups_to_process_.store(environment_.num_workers(), std::memory_order_release);
}

void GroupedSchedulingPolicy::worker_function(const Worker<GroupedSchedulingPolicy>& worker) {
// This is used as a list for storing new ready groups found while processing a group.
std::vector<ReactionGroup*> ready_groups;
Expand Down Expand Up @@ -206,8 +206,17 @@ void GroupedSchedulingPolicy::worker_function(const Worker<GroupedSchedulingPoli
bool need_to_schedule = finalize_group_and_notify_successors(group, ready_groups);

if (need_to_schedule) {
schedule();
notify_groups(initial_groups_, ready_groups);
// We use a do-while loop here as we could have scheduled events that do not trigger any reactions.
// In this case, ready_groups will be empty and we can simply call schedule again.
do {
if (continue_execution_.load(std::memory_order_acquire)) {
schedule();
notify_groups(initial_groups_, ready_groups);
} else {
terminate_workers();
break;
}
} while (ready_groups.empty());
}

log::Debug() << "(Worker " << worker.id() << ") found " << ready_groups.size()
Expand Down

0 comments on commit f85a3b8

Please sign in to comment.