From 5561829c012915132bec12c01121a6148f10c134 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Thu, 9 Nov 2023 12:49:52 -0600 Subject: [PATCH] Enable fork_join_executor to handle return values from scheduled functions --- .../hpx/executors/fork_join_executor.hpp | 164 +++++++++++++----- .../tests/regressions/CMakeLists.txt | 10 +- .../regressions/fork_join_with_result.cpp | 33 ++++ .../tests/unit/fork_join_executor.cpp | 147 +++++++++++++++- 4 files changed, 308 insertions(+), 46 deletions(-) create mode 100644 libs/core/executors/tests/regressions/fork_join_with_result.cpp diff --git a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp index 381fa2b76084..e9eeba226295 100644 --- a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp +++ b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp @@ -123,6 +123,7 @@ namespace hpx::execution::experimental { void* element_function_; void const* shape_; void* argument_pack_; + void* results_; }; // Can't apply 'using' here as the type needs to be forward @@ -369,7 +370,7 @@ namespace hpx::execution::experimental { region_data_[t].data_.state_.store( thread_state::starting, std::memory_order_relaxed); - auto policy = + auto const policy = launch::async_policy(priority_, stacksize_, threads::thread_schedule_hint{ static_cast(t)}); @@ -501,7 +502,7 @@ namespace hpx::execution::experimental { // (additional arguments packed into a tuple) given to // bulk_sync_execute without wrapping it into hpx::function or // similar. - template + template struct thread_function_helper { using argument_pack_type = std::decay_t; @@ -509,10 +510,10 @@ namespace hpx::execution::experimental { template - static constexpr void invoke_helper( + static constexpr decltype(auto) invoke_helper( hpx::util::index_pack, F_&& f, A_&& a, Tuple_&& t) { - HPX_INVOKE(HPX_FORWARD(F_, f), HPX_FORWARD(A_, a), + return HPX_INVOKE(HPX_FORWARD(F_, f), HPX_FORWARD(A_, a), hpx::get(HPX_FORWARD(Tuple_, t))...); } @@ -555,8 +556,19 @@ namespace hpx::execution::experimental { { auto it = std::next( hpx::util::begin(shape), part_begin); - invoke_helper(index_pack_type{}, - element_function, *it, argument_pack); + if constexpr (std::is_void_v) + { + invoke_helper(index_pack_type{}, + element_function, *it, argument_pack); + } + else + { + auto& results = + *static_cast(data.results_); + results[part_begin] = invoke_helper( + index_pack_type{}, element_function, + *it, argument_pack); + } } }, [&](std::exception_ptr&& ep) { @@ -604,8 +616,19 @@ namespace hpx::execution::experimental { { auto it = std::next(hpx::util::begin(shape), *index); - invoke_helper(index_pack_type{}, - element_function, *it, argument_pack); + if constexpr (std::is_void_v) + { + invoke_helper(index_pack_type{}, + element_function, *it, argument_pack); + } + else + { + auto& results = + *static_cast(data.results_); + results[*index] = invoke_helper( + index_pack_type{}, element_function, + *it, argument_pack); + } } // As loop schedule is dynamic, steal from neighboring @@ -630,8 +653,21 @@ namespace hpx::execution::experimental { { auto it = std::next( hpx::util::begin(shape), *index); - invoke_helper(index_pack_type{}, - element_function, *it, argument_pack); + + if constexpr (std::is_void_v) + { + invoke_helper(index_pack_type{}, + element_function, *it, + argument_pack); + } + else + { + auto& results = *static_cast( + data.results_); + results[*index] = invoke_helper( + index_pack_type{}, element_function, + *it, argument_pack); + } } } }, @@ -709,19 +745,21 @@ namespace hpx::execution::experimental { } }; - template + template thread_function_helper_type* set_all_states_and_region_data( - thread_state state, F& f, S const& shape, + void* results, thread_state state, F& f, S const& shape, Args& argument_pack) noexcept { thread_function_helper_type* func; if (schedule_ == loop_schedule::static_ || num_threads_ == 1) { - func = &thread_function_helper::call_static; + func = &thread_function_helper::call_static; } else { - func = &thread_function_helper::call_dynamic; + func = &thread_function_helper::call_dynamic; } for (std::size_t t = 0; t != num_threads_; ++t) @@ -732,6 +770,7 @@ namespace hpx::execution::experimental { data.shape_ = &shape; data.argument_pack_ = &argument_pack; data.thread_function_helper_ = func; + data.results_ = results; data.state_.store(state, std::memory_order_release); } @@ -760,9 +799,27 @@ namespace hpx::execution::experimental { return func; } + template + void invoke_work(F&& f) + { + // Start work on the main thread. + f(region_data_, main_thread_, num_threads_, queues_, + exception_mutex_, exception_); + + // Wait for all threads to finish their work assigned to + // them in this parallel region. + wait_state_all(thread_state::idle); + + // rethrow exception, if any + if (exception_) + { + std::rethrow_exception(HPX_MOVE(exception_)); + } + } + public: template - void bulk_sync_execute(F&& f, S const& shape, Ts&&... ts) + decltype(auto) bulk_sync_execute(F&& f, S const& shape, Ts&&... ts) { // protect against nested use of this executor instance if (region_data_[main_thread_].data_.state_.load( @@ -773,53 +830,77 @@ namespace hpx::execution::experimental { "fork_join_executor being used in nested ways?"); } +#if defined(HPX_HAVE_THREAD_DESCRIPTION) hpx::scoped_annotation annotate( generate_annotation(hpx::get_worker_thread_num(), "fork_join_executor::bulk_sync_execute")); - +#endif exception_ = std::exception_ptr(); // Set the data for this parallel region auto argument_pack = hpx::forward_as_tuple(HPX_FORWARD(Ts, ts)...); - // Signal all worker threads to start partitioning work for - // themselves, and then starting the actual work. - thread_function_helper_type* func = - set_all_states_and_region_data( - thread_state::partitioning_work, f, shape, - argument_pack); + using result_type = + hpx::parallel::execution::detail::bulk_execute_result_t; - // Start work on the main thread. - func(region_data_, main_thread_, num_threads_, queues_, - exception_mutex_, exception_); + if constexpr (std::is_void_v) + { + // Signal all worker threads to start partitioning work for + // themselves, and then starting the actual work. + thread_function_helper_type* func = + set_all_states_and_region_data(nullptr, + thread_state::partitioning_work, f, shape, + argument_pack); + + invoke_work(func); + } + else + { + result_type results(hpx::util::size(shape)); - // Wait for all threads to finish their work assigned to - // them in this parallel region. - wait_state_all(thread_state::idle); + // Signal all worker threads to start partitioning work for + // themselves, and then starting the actual work. + thread_function_helper_type* func = + set_all_states_and_region_data(&results, + thread_state::partitioning_work, f, shape, + argument_pack); - // rethrow exception, if any - if (exception_) - { - std::rethrow_exception(HPX_MOVE(exception_)); + invoke_work(func); + + return results; } } template - hpx::future bulk_async_execute( - F&& f, S const& shape, Ts&&... ts) + decltype(auto) bulk_async_execute(F&& f, S const& shape, Ts&&... ts) { + using result_type = + hpx::parallel::execution::detail::bulk_execute_result_t; + // Forward to the synchronous version as we can't create // futures to the completion of the parallel region (this HPX // thread participates in computation). return hpx::detail::try_catch_exception_ptr( [&]() { - bulk_sync_execute( - HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...); - return hpx::make_ready_future(); + if constexpr (std::is_void_v) + { + bulk_sync_execute(HPX_FORWARD(F, f), shape, + HPX_FORWARD(Ts, ts)...); + return hpx::make_ready_future(); + } + else + { + auto&& result = bulk_sync_execute(HPX_FORWARD(F, f), + shape, HPX_FORWARD(Ts, ts)...); + return hpx::make_ready_future(HPX_MOVE(result)); + } }, [&](std::exception_ptr&& ep) { - return hpx::make_exceptional_future(HPX_MOVE(ep)); + return hpx::make_exceptional_future( + HPX_MOVE(ep)); }); } @@ -838,10 +919,11 @@ namespace hpx::execution::experimental { } // Set the data for this parallel region +#if defined(HPX_HAVE_THREAD_DESCRIPTION) hpx::scoped_annotation annotate( generate_annotation(hpx::get_worker_thread_num(), "fork_join_executor::sync_invoke")); - +#endif exception_ = std::exception_ptr(); auto args = hpx::make_tuple(first, size); @@ -914,7 +996,7 @@ namespace hpx::execution::experimental { hpx::parallel::execution::bulk_sync_execute_t, fork_join_executor const& exec, F&& f, S const& shape, Ts&&... ts) { - exec.shared_data_->bulk_sync_execute( + return exec.shared_data_->bulk_sync_execute( HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...); } @@ -956,7 +1038,7 @@ namespace hpx::execution::experimental { hpx::parallel::execution::sync_invoke_t, fork_join_executor const& exec, F&& f, Fs&&... fs) { - exec.shared_data_->sync_invoke( + return exec.shared_data_->sync_invoke( HPX_FORWARD(F, f), HPX_FORWARD(Fs, fs)...); } diff --git a/libs/core/executors/tests/regressions/CMakeLists.txt b/libs/core/executors/tests/regressions/CMakeLists.txt index ca6098a1b0e8..36c576b51898 100644 --- a/libs/core/executors/tests/regressions/CMakeLists.txt +++ b/libs/core/executors/tests/regressions/CMakeLists.txt @@ -5,8 +5,14 @@ # Distributed under the Boost Software License, Version 1.0. (See accompanying # file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -set(tests bulk_then_execute_3182 bulk_sync_wait future_then_async_executor - parallel_executor_1781 pu_count_6184 wrapping_executor +set(tests + bulk_then_execute_3182 + bulk_sync_wait + fork_join_with_result + future_then_async_executor + parallel_executor_1781 + pu_count_6184 + wrapping_executor ) foreach(test ${tests}) diff --git a/libs/core/executors/tests/regressions/fork_join_with_result.cpp b/libs/core/executors/tests/regressions/fork_join_with_result.cpp new file mode 100644 index 000000000000..ef53b4924b7b --- /dev/null +++ b/libs/core/executors/tests/regressions/fork_join_with_result.cpp @@ -0,0 +1,33 @@ +// Copyright (c) 2023 Giannis Gonidelis +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include + +#include + +int hpx_main() +{ + hpx::execution::experimental::fork_join_executor exec{}; + + auto const result = hpx::transform_reduce(hpx::execution::par.on(exec), + hpx::util::counting_iterator(0), hpx::util::counting_iterator(100), 0L, + std::plus{}, [&](auto i) { return i * i; }); + + HPX_TEST_EQ(result, 328350L); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} diff --git a/libs/core/executors/tests/unit/fork_join_executor.cpp b/libs/core/executors/tests/unit/fork_join_executor.cpp index c1d9314e415a..6715b0d79b4c 100644 --- a/libs/core/executors/tests/unit/fork_join_executor.cpp +++ b/libs/core/executors/tests/unit/fork_join_executor.cpp @@ -84,7 +84,7 @@ void test_bulk_async(ExecutorArgs&&... args) std::cerr << "test_bulk_async\n"; count1 = 0; - std::size_t const n = 107; + constexpr std::size_t n = 107; std::vector v(n); std::iota(std::begin(v), std::end(v), std::rand()); @@ -103,6 +103,70 @@ void test_bulk_async(ExecutorArgs&&... args) HPX_TEST_EQ(count1.load(), 2 * n); } +/////////////////////////////////////////////////////////////////////////////// +int bulk_test_with_result(int val, int passed_through) //-V813 +{ + ++count1; + HPX_TEST_EQ(passed_through, 42); + return val; +} + +template +void test_bulk_sync_with_result(ExecutorArgs&&... args) +{ + std::cerr << "test_bulk_sync_with_result\n"; + + count1 = 0; + constexpr std::size_t n = 107; + std::vector v(n); + std::iota(std::begin(v), std::end(v), std::rand()); + + using hpx::placeholders::_1; + using hpx::placeholders::_2; + + fork_join_executor exec{std::forward(args)...}; + std::vector const result1 = + hpx::parallel::execution::bulk_sync_execute( + exec, hpx::bind(&bulk_test_with_result, _1, _2), v, 42); + HPX_TEST_EQ(count1.load(), n); + HPX_TEST(result1 == v); + + std::vector const result2 = + hpx::parallel::execution::bulk_sync_execute( + exec, &bulk_test_with_result, v, 42); + HPX_TEST_EQ(count1.load(), 2 * n); + HPX_TEST(result2 == v); +} + +template +void test_bulk_async_with_result(ExecutorArgs&&... args) +{ + std::cerr << "test_bulk_async_with_result\n"; + + count1 = 0; + constexpr std::size_t n = 107; + std::vector v(n); + std::iota(std::begin(v), std::end(v), std::rand()); + + using hpx::placeholders::_1; + using hpx::placeholders::_2; + + fork_join_executor exec{std::forward(args)...}; + std::vector const result1 = + hpx::parallel::execution::bulk_async_execute( + exec, hpx::bind(&bulk_test_with_result, _1, _2), v, 42) + .get(); + HPX_TEST_EQ(count1.load(), n); + HPX_TEST(result1 == v); + + std::vector const result2 = + hpx::parallel::execution::bulk_async_execute( + exec, &bulk_test_with_result, v, 42) + .get(); + HPX_TEST_EQ(count1.load(), 2 * n); + HPX_TEST(result2 == v); +} + /////////////////////////////////////////////////////////////////////////////// void bulk_test_exception(int, int passed_through) //-V813 { @@ -116,7 +180,7 @@ void test_bulk_sync_exception(ExecutorArgs&&... args) std::cerr << "test_bulk_sync_exception\n"; count1 = 0; - std::size_t const n = 107; + constexpr std::size_t n = 107; std::vector v(n); std::iota(std::begin(v), std::end(v), std::rand()); @@ -147,7 +211,7 @@ void test_bulk_async_exception(ExecutorArgs&&... args) std::cerr << "test_bulk_async_exception\n"; count1 = 0; - std::size_t const n = 107; + constexpr std::size_t n = 107; std::vector v(n); std::iota(std::begin(v), std::end(v), std::rand()); @@ -173,6 +237,78 @@ void test_bulk_async_exception(ExecutorArgs&&... args) HPX_TEST(caught_exception); } +/////////////////////////////////////////////////////////////////////////////// +int bulk_test_exception_with_result(int, int passed_through) //-V813 +{ + HPX_TEST_EQ(passed_through, 42); + throw std::runtime_error("test"); +} + +template +void test_bulk_sync_exception_with_result(ExecutorArgs&&... args) +{ + std::cerr << "test_bulk_sync_exception_with_result\n"; + + count1 = 0; + constexpr std::size_t n = 107; + std::vector v(n); + std::iota(std::begin(v), std::end(v), std::rand()); + + fork_join_executor exec{std::forward(args)...}; + bool caught_exception = false; + try + { + [[maybe_unused]] std::vector const result = + hpx::parallel::execution::bulk_sync_execute( + exec, &bulk_test_exception_with_result, v, 42); + + HPX_TEST(false); + } + catch (std::runtime_error const& /*e*/) + { + caught_exception = true; + } + catch (...) + { + HPX_TEST(false); + } + + HPX_TEST(caught_exception); +} + +template +void test_bulk_async_exception_with_result(ExecutorArgs&&... args) +{ + std::cerr << "test_bulk_async_exception_with_result\n"; + + count1 = 0; + constexpr std::size_t n = 107; + std::vector v(n); + std::iota(std::begin(v), std::end(v), std::rand()); + + fork_join_executor exec{std::forward(args)...}; + bool caught_exception = false; + try + { + auto r = hpx::parallel::execution::bulk_async_execute( + exec, &bulk_test_exception_with_result, v, 42); + [[maybe_unused]] std::vector const result = r.get(); + + HPX_TEST(false); + } + catch (std::runtime_error const& /*e*/) + { + caught_exception = true; + } + catch (...) + { + HPX_TEST(false); + } + + HPX_TEST(caught_exception); +} + +/////////////////////////////////////////////////////////////////////////////// template void test_invoke_sync_homogeneous(ExecutorArgs&&... args) { @@ -322,6 +458,11 @@ void test_executor(hpx::threads::thread_priority priority, test_bulk_sync_exception(priority, stacksize, schedule); test_bulk_async_exception(priority, stacksize, schedule); + test_bulk_sync_with_result(priority, stacksize, schedule); + test_bulk_async_with_result(priority, stacksize, schedule); + test_bulk_sync_exception_with_result(priority, stacksize, schedule); + test_bulk_async_exception_with_result(priority, stacksize, schedule); + test_invoke_sync_homogeneous(priority, stacksize, schedule); test_invoke_sync(priority, stacksize, schedule); test_invoke_sync_homogeneous_exception(priority, stacksize, schedule);