Skip to content

Commit

Permalink
Enable fork_join_executor to handle return values from scheduled func…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
hkaiser committed Nov 9, 2023
1 parent 3072a57 commit 59f8495
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 46 deletions.
170 changes: 129 additions & 41 deletions libs/core/executors/include/hpx/executors/fork_join_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ namespace hpx::execution::experimental {
void* element_function_;
void const* shape_;
void* argument_pack_;
void* results_;
};

// Can't apply 'using' here as the type needs to be forward
Expand Down Expand Up @@ -369,7 +370,7 @@ namespace hpx::execution::experimental {
region_data_[t].data_.state_.store(
thread_state::starting, std::memory_order_relaxed);

auto policy =
auto const policy =
launch::async_policy(priority_, stacksize_,
threads::thread_schedule_hint{
static_cast<std::int16_t>(t)});
Expand Down Expand Up @@ -501,18 +502,18 @@ namespace hpx::execution::experimental {
// (additional arguments packed into a tuple) given to
// bulk_sync_execute without wrapping it into hpx::function or
// similar.
template <typename F, typename S, typename Tuple>
template <typename Result, typename F, typename S, typename Tuple>
struct thread_function_helper
{
using argument_pack_type = std::decay_t<Tuple>;
using index_pack_type = hpx::detail::fused_index_pack_t<Tuple>;

template <std::size_t... Is_, typename F_, typename A_,
typename Tuple_>
static constexpr void invoke_helper(
static constexpr decltype(auto) invoke_helper(
hpx::util::index_pack<Is_...>, F_&& f, A_&& a, Tuple_&& t)
{
HPX_INVOKE(HPX_FORWARD(F_, f), HPX_FORWARD(A_, a),
return HPX_INVOKE(HPX_FORWARD(F_, f), HPX_FORWARD(A_, a),
hpx::get<Is_>(HPX_FORWARD(Tuple_, t))...);
}

Expand Down Expand Up @@ -555,8 +556,19 @@ namespace hpx::execution::experimental {
{
auto it = std::next(
hpx::util::begin(shape), part_begin);
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
if constexpr (std::is_void_v<Result>)
{
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
}
else
{
auto& results =
*static_cast<Result*>(data.results_);
results[part_begin] = invoke_helper(
index_pack_type{}, element_function,
*it, argument_pack);
}
}
},
[&](std::exception_ptr&& ep) {
Expand Down Expand Up @@ -604,8 +616,19 @@ namespace hpx::execution::experimental {
{
auto it =
std::next(hpx::util::begin(shape), *index);
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
if constexpr (std::is_void_v<Result>)
{
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);
}
else
{
auto& results =
*static_cast<Result*>(data.results_);
results[*index] = invoke_helper(
index_pack_type{}, element_function,
*it, argument_pack);
}
}

// As loop schedule is dynamic, steal from neighboring
Expand All @@ -630,8 +653,21 @@ namespace hpx::execution::experimental {
{
auto it = std::next(
hpx::util::begin(shape), *index);
invoke_helper(index_pack_type{},
element_function, *it, argument_pack);

if constexpr (std::is_void_v<Result>)
{
invoke_helper(index_pack_type{},
element_function, *it,
argument_pack);
}
else
{
auto& results = *static_cast<Result*>(
data.results_);
results[*index] = invoke_helper(
index_pack_type{}, element_function,
*it, argument_pack);
}
}
}
},
Expand Down Expand Up @@ -709,19 +745,21 @@ namespace hpx::execution::experimental {
}
};

template <typename F, typename S, typename Args>
template <typename Result, typename F, typename S, typename Args>
thread_function_helper_type* set_all_states_and_region_data(
thread_state state, F& f, S const& shape,
void* results, thread_state state, F& f, S const& shape,
Args& argument_pack) noexcept
{
thread_function_helper_type* func;
if (schedule_ == loop_schedule::static_ || num_threads_ == 1)
{
func = &thread_function_helper<F, S, Args>::call_static;
func = &thread_function_helper<Result, F, S,
Args>::call_static;
}
else
{
func = &thread_function_helper<F, S, Args>::call_dynamic;
func = &thread_function_helper<Result, F, S,
Args>::call_dynamic;
}

for (std::size_t t = 0; t != num_threads_; ++t)
Expand All @@ -732,6 +770,7 @@ namespace hpx::execution::experimental {
data.shape_ = &shape;
data.argument_pack_ = &argument_pack;
data.thread_function_helper_ = func;
data.results_ = results;

data.state_.store(state, std::memory_order_release);
}
Expand Down Expand Up @@ -762,7 +801,7 @@ namespace hpx::execution::experimental {

public:
template <typename F, typename S, typename... Ts>
void bulk_sync_execute(F&& f, S const& shape, Ts&&... ts)
decltype(auto) bulk_sync_execute(F&& f, S const& shape, Ts&&... ts)
{
// protect against nested use of this executor instance
if (region_data_[main_thread_].data_.state_.load(
Expand All @@ -773,53 +812,101 @@ namespace hpx::execution::experimental {
"fork_join_executor being used in nested ways?");
}

#if defined(HPX_HAVE_THREAD_DESCRIPTION)
hpx::scoped_annotation annotate(
generate_annotation(hpx::get_worker_thread_num(),
"fork_join_executor::bulk_sync_execute"));

#endif
exception_ = std::exception_ptr();

// Set the data for this parallel region
auto argument_pack =
hpx::forward_as_tuple(HPX_FORWARD(Ts, ts)...);

// Signal all worker threads to start partitioning work for
// themselves, and then starting the actual work.
thread_function_helper_type* func =
set_all_states_and_region_data(
thread_state::partitioning_work, f, shape,
argument_pack);
using result_type =
hpx::parallel::execution::detail::bulk_execute_result_t<F,
S, Ts...>;

// Start work on the main thread.
func(region_data_, main_thread_, num_threads_, queues_,
exception_mutex_, exception_);
if constexpr (std::is_void_v<result_type>)
{
// Signal all worker threads to start partitioning work for
// themselves, and then starting the actual work.
thread_function_helper_type* func =
set_all_states_and_region_data<void>(nullptr,
thread_state::partitioning_work, f, shape,
argument_pack);

// Start work on the main thread.
func(region_data_, main_thread_, num_threads_, queues_,
exception_mutex_, exception_);

// Wait for all threads to finish their work assigned to
// them in this parallel region.
wait_state_all(thread_state::idle);

// rethrow exception, if any
if (exception_)
{
std::rethrow_exception(HPX_MOVE(exception_));
}
}
else
{
result_type results(hpx::util::size(shape));

// Wait for all threads to finish their work assigned to
// them in this parallel region.
wait_state_all(thread_state::idle);
// Signal all worker threads to start partitioning work for
// themselves, and then starting the actual work.
thread_function_helper_type* func =
set_all_states_and_region_data<result_type>(&results,
thread_state::partitioning_work, f, shape,
argument_pack);

// rethrow exception, if any
if (exception_)
{
std::rethrow_exception(HPX_MOVE(exception_));
// Start work on the main thread.
func(region_data_, main_thread_, num_threads_, queues_,
exception_mutex_, exception_);

// Wait for all threads to finish their work assigned to
// them in this parallel region.
wait_state_all(thread_state::idle);

// rethrow exception, if any
if (exception_)
{
std::rethrow_exception(HPX_MOVE(exception_));
}

return results;
}
}

template <typename F, typename S, typename... Ts>
hpx::future<void> bulk_async_execute(
F&& f, S const& shape, Ts&&... ts)
decltype(auto) bulk_async_execute(F&& f, S const& shape, Ts&&... ts)
{
using result_type =
hpx::parallel::execution::detail::bulk_execute_result_t<F,
S, Ts...>;

// Forward to the synchronous version as we can't create
// futures to the completion of the parallel region (this HPX
// thread participates in computation).
return hpx::detail::try_catch_exception_ptr(
[&]() {
bulk_sync_execute(
HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
return hpx::make_ready_future();
if constexpr (std::is_void_v<result_type>)
{
bulk_sync_execute(HPX_FORWARD(F, f), shape,
HPX_FORWARD(Ts, ts)...);
return hpx::make_ready_future();
}
else
{
auto&& result = bulk_sync_execute(HPX_FORWARD(F, f),
shape, HPX_FORWARD(Ts, ts)...);
return hpx::make_ready_future(HPX_MOVE(result));
}
},
[&](std::exception_ptr&& ep) {
return hpx::make_exceptional_future<void>(HPX_MOVE(ep));
return hpx::make_exceptional_future<result_type>(
HPX_MOVE(ep));
});
}

Expand All @@ -838,10 +925,11 @@ namespace hpx::execution::experimental {
}

// Set the data for this parallel region
#if defined(HPX_HAVE_THREAD_DESCRIPTION)
hpx::scoped_annotation annotate(
generate_annotation(hpx::get_worker_thread_num(),
"fork_join_executor::sync_invoke"));

#endif
exception_ = std::exception_ptr();

auto args = hpx::make_tuple(first, size);
Expand Down Expand Up @@ -914,7 +1002,7 @@ namespace hpx::execution::experimental {
hpx::parallel::execution::bulk_sync_execute_t,
fork_join_executor const& exec, F&& f, S const& shape, Ts&&... ts)
{
exec.shared_data_->bulk_sync_execute(
return exec.shared_data_->bulk_sync_execute(
HPX_FORWARD(F, f), shape, HPX_FORWARD(Ts, ts)...);
}

Expand Down Expand Up @@ -956,7 +1044,7 @@ namespace hpx::execution::experimental {
hpx::parallel::execution::sync_invoke_t,
fork_join_executor const& exec, F&& f, Fs&&... fs)
{
exec.shared_data_->sync_invoke(
return exec.shared_data_->sync_invoke(
HPX_FORWARD(F, f), HPX_FORWARD(Fs, fs)...);
}

Expand Down
10 changes: 8 additions & 2 deletions libs/core/executors/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests bulk_then_execute_3182 bulk_sync_wait future_then_async_executor
parallel_executor_1781 pu_count_6184 wrapping_executor
set(tests
bulk_then_execute_3182
bulk_sync_wait
fork_join_with_result
future_then_async_executor
parallel_executor_1781
pu_count_6184
wrapping_executor
)

foreach(test ${tests})
Expand Down
33 changes: 33 additions & 0 deletions libs/core/executors/tests/regressions/fork_join_with_result.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2023 Giannis Gonidelis
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/algorithm.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

#include <functional>

int hpx_main()
{
hpx::execution::experimental::fork_join_executor exec{};

auto const result = hpx::transform_reduce(hpx::execution::par.on(exec),
hpx::util::counting_iterator(0), hpx::util::counting_iterator(100), 0L,
std::plus{}, [&](auto i) { return i * i; });

HPX_TEST_EQ(result, 328350L);

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Loading

0 comments on commit 59f8495

Please sign in to comment.