Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable fork_join_executor to handle return values from scheduled functions #6383

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 123 additions & 41 deletions libs/core/executors/include/hpx/executors/fork_join_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ namespace hpx::execution::experimental {
void* element_function_;
void const* shape_;
void* argument_pack_;
void* results_;
};

// Can't apply 'using' here as the type needs to be forward
Expand Down Expand Up @@ -369,7 +370,7 @@ namespace hpx::execution::experimental {
region_data_[t].data_.state_.store(
thread_state::starting, std::memory_order_relaxed);

auto policy =
auto const policy =
launch::async_policy(priority_, stacksize_,
threads::thread_schedule_hint{
static_cast<std::int16_t>(t)});
Expand Down Expand Up @@ -501,18 +502,18 @@ namespace hpx::execution::experimental {
// (additional arguments packed into a tuple) given to
// bulk_sync_execute without wrapping it into hpx::function or
// similar.
template <typename F, typename S, typename Tuple>
template <typename Result, typename F, typename S, typename Tuple>
struct thread_function_helper
{
using argument_pack_type = std::decay_t<Tuple>;
using index_pack_type = hpx::detail::fused_index_pack_t<Tuple>;

template <std::size_t... Is_, typename F_, typename A_,
typename Tuple_>
static constexpr void invoke_helper(
static constexpr decltype(auto) invoke_helper(
hpx::util::index_pack<Is_...>, F_&& f, A_&& a, Tuple_&& t)
{
HPX_INVOKE(HPX_FORWARD(F_, f), HPX_FORWARD(A_, a),
return HPX_INVOKE(HPX_FORWARD(F_, f), HPX_FORWARD(A_, a),
hpx::get<Is_>(HPX_FORWARD(Tuple_, t))...);
}

Expand Down Expand Up @@ -555,8 +556,19 @@ namespace hpx::execution::experimental {
{
auto it = std::next(
hpx::util::begin(shape), part_begin);
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
if constexpr (std::is_void_v<Result>)
{
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
}
else
{
auto& results =
*static_cast<Result*>(data.results_);
results[part_begin] = invoke_helper(
index_pack_type{}, element_function,
*it, argument_pack);
}
}
},
[&](std::exception_ptr&& ep) {
Expand Down Expand Up @@ -604,8 +616,19 @@ namespace hpx::execution::experimental {
{
auto it =
std::next(hpx::util::begin(shape), *index);
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
if constexpr (std::is_void_v<Result>)
{
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
}
else
{
auto& results =
*static_cast<Result*>(data.results_);
results[*index] = invoke_helper(
index_pack_type{}, element_function,
*it, argument_pack);
}
}

// As loop schedule is dynamic, steal from neighboring
Expand All @@ -630,8 +653,21 @@ namespace hpx::execution::experimental {
{
auto it = std::next(
hpx::util::begin(shape), *index);
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);

if constexpr (std::is_void_v<Result>)
{
invoke_helper(index_pack_type{},
element_function, *it,
argument_pack);
}
else
{
auto& results = *static_cast<Result*>(
data.results_);
results[*index] = invoke_helper(
index_pack_type{}, element_function,
*it, argument_pack);
}
}
}
},
Expand Down Expand Up @@ -709,19 +745,21 @@ namespace hpx::execution::experimental {
}
};

template <typename F, typename S, typename Args>
template <typename Result, typename F, typename S, typename Args>
thread_function_helper_type* set_all_states_and_region_data(
thread_state state, F& f, S const& shape,
void* results, thread_state state, F& f, S const& shape,
Args& argument_pack) noexcept
{
thread_function_helper_type* func;
if (schedule_ == loop_schedule::static_ || num_threads_ == 1)
{
func = &thread_function_helper<F, S, Args>::call_static;
func = &thread_function_helper<Result, F, S,
Args>::call_static;
}
else
{
func = &thread_function_helper<F, S, Args>::call_dynamic;
func = &thread_function_helper<Result, F, S,
Args>::call_dynamic;
}

for (std::size_t t = 0; t != num_threads_; ++t)
Expand All @@ -732,6 +770,7 @@ namespace hpx::execution::experimental {
data.shape_ = &shape;
data.argument_pack_ = &argument_pack;
data.thread_function_helper_ = func;
data.results_ = results;

data.state_.store(state, std::memory_order_release);
}
Expand Down Expand Up @@ -760,9 +799,27 @@ namespace hpx::execution::experimental {
return func;
}

template <typename F>
void invoke_work(F&& f)
{
// Start work on the main thread.
f(region_data_, main_thread_, num_threads_, queues_,
exception_mutex_, exception_);

// Wait for all threads to finish their work assigned to
// them in this parallel region.
wait_state_all(thread_state::idle);

// rethrow exception, if any
if (exception_)
{
std::rethrow_exception(HPX_MOVE(exception_));
}
}

public:
template <typename F, typename S, typename... Ts>
void bulk_sync_execute(F&& f, S const& shape, Ts&&... ts)
decltype(auto) bulk_sync_execute(F&& f, S const& shape, Ts&&... ts)
{
// protect against nested use of this executor instance
if (region_data_[main_thread_].data_.state_.load(
Expand All @@ -773,53 +830,77 @@ namespace hpx::execution::experimental {
"fork_join_executor being used in nested ways?");
}

#if defined(HPX_HAVE_THREAD_DESCRIPTION)
hpx::scoped_annotation annotate(
generate_annotation(hpx::get_worker_thread_num(),
"fork_join_executor::bulk_sync_execute"));

#endif
exception_ = std::exception_ptr();

// Set the data for this parallel region
auto argument_pack =
hpx::forward_as_tuple(HPX_FORWARD(Ts, ts)...);

// Signal all worker threads to start partitioning work for
// themselves, and then starting the actual work.
thread_function_helper_type* func =
set_all_states_and_region_data(
thread_state::partitioning_work, f, shape,
argument_pack);
using result_type =
hpx::parallel::execution::detail::bulk_execute_result_t<F,
S, Ts...>;

// Start work on the main thread.
func(region_data_, main_thread_, num_threads_, queues_,
exception_mutex_, exception_);
if constexpr (std::is_void_v<result_type>)
{
// Signal all worker threads to start partitioning work for
// themselves, and then starting the actual work.
thread_function_helper_type* func =
set_all_states_and_region_data<void>(nullptr,
thread_state::partitioning_work, f, shape,
argument_pack);

invoke_work(func);
}
else
{
result_type results(hpx::util::size(shape));

// Wait for all threads to finish their work assigned to
// them in this parallel region.
wait_state_all(thread_state::idle);
// Signal all worker threads to start partitioning work for
// themselves, and then starting the actual work.
thread_function_helper_type* func =
set_all_states_and_region_data<result_type>(&results,
thread_state::partitioning_work, f, shape,
argument_pack);

// rethrow exception, if any
if (exception_)
{
std::rethrow_exception(HPX_MOVE(exception_));
invoke_work(func);

return results;
}
}

template <typename F, typename S, typename... Ts>
hpx::future<void> bulk_async_execute(
F&& f, S const& shape, Ts&&... ts)
decltype(auto) bulk_async_execute(F&& f, S const& shape, Ts&&... ts)
{
using result_type =
hpx::parallel::execution::detail::bulk_execute_result_t<F,
S, Ts...>;

// Forward to the synchronous version as we can't create
// futures to the completion of the parallel region (this HPX
// thread participates in computation).
return hpx::detail::try_catch_exception_ptr(
[&]() {
bulk_sync_execute(
HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
return hpx::make_ready_future();
if constexpr (std::is_void_v<result_type>)
{
bulk_sync_execute(HPX_FORWARD(F, f), shape,
HPX_FORWARD(Ts, ts)...);
return hpx::make_ready_future();
}
else
{
auto&& result = bulk_sync_execute(HPX_FORWARD(F, f),
shape, HPX_FORWARD(Ts, ts)...);
return hpx::make_ready_future(HPX_MOVE(result));
}
},
[&](std::exception_ptr&& ep) {
return hpx::make_exceptional_future<void>(HPX_MOVE(ep));
return hpx::make_exceptional_future<result_type>(
HPX_MOVE(ep));
});
}

Expand All @@ -838,10 +919,11 @@ namespace hpx::execution::experimental {
}

// Set the data for this parallel region
#if defined(HPX_HAVE_THREAD_DESCRIPTION)
hpx::scoped_annotation annotate(
generate_annotation(hpx::get_worker_thread_num(),
"fork_join_executor::sync_invoke"));

#endif
exception_ = std::exception_ptr();

auto args = hpx::make_tuple(first, size);
Expand Down Expand Up @@ -914,7 +996,7 @@ namespace hpx::execution::experimental {
hpx::parallel::execution::bulk_sync_execute_t,
fork_join_executor const& exec, F&& f, S const& shape, Ts&&... ts)
{
exec.shared_data_->bulk_sync_execute(
return exec.shared_data_->bulk_sync_execute(
HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
}

Expand Down Expand Up @@ -956,7 +1038,7 @@ namespace hpx::execution::experimental {
hpx::parallel::execution::sync_invoke_t,
fork_join_executor const& exec, F&& f, Fs&&... fs)
{
exec.shared_data_->sync_invoke(
return exec.shared_data_->sync_invoke(
HPX_FORWARD(F, f), HPX_FORWARD(Fs, fs)...);
}

Expand Down
10 changes: 8 additions & 2 deletions libs/core/executors/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests bulk_then_execute_3182 bulk_sync_wait future_then_async_executor
parallel_executor_1781 pu_count_6184 wrapping_executor
set(tests
bulk_then_execute_3182
bulk_sync_wait
fork_join_with_result
future_then_async_executor
parallel_executor_1781
pu_count_6184
wrapping_executor
)

foreach(test ${tests})
Expand Down
33 changes: 33 additions & 0 deletions libs/core/executors/tests/regressions/fork_join_with_result.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2023 Giannis Gonidelis
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/algorithm.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

#include <functional>

int hpx_main()
{
hpx::execution::experimental::fork_join_executor exec{};

auto const result = hpx::transform_reduce(hpx::execution::par.on(exec),
hpx::util::counting_iterator(0), hpx::util::counting_iterator(100), 0L,
std::plus{}, [&](auto i) { return i * i; });

HPX_TEST_EQ(result, 328350L);

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Loading
Loading