From 368ea21e81b8a2e19c54d057a53066127e00ebc2 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Tue, 14 May 2024 09:22:02 -0500 Subject: [PATCH] Fixing handling of bool value type for collective operations --- .../include/hpx/collectives/all_reduce.hpp | 7 +- .../hpx/collectives/exclusive_scan.hpp | 4 +- .../include/hpx/collectives/reduce.hpp | 13 +- .../tests/regressions/CMakeLists.txt | 2 + .../tests/regressions/reduce_vector_bool.cpp | 157 ++++++++++++++++++ 5 files changed, 175 insertions(+), 8 deletions(-) create mode 100644 libs/full/collectives/tests/regressions/reduce_vector_bool.cpp diff --git a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp index 1fd006794ffa..e87d647129b5 100644 --- a/libs/full/collectives/include/hpx/collectives/all_reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/all_reduce.hpp @@ -172,8 +172,11 @@ namespace hpx::traits { { // compute reduction result only once auto it = data.begin(); - data[0] = hpx::reduce( - ++it, data.end(), data[0], HPX_FORWARD(F, op)); + data[0] = Communicator::template handle_bool< + std::decay_t>(hpx::reduce(++it, data.end(), + Communicator::template handle_bool>( + data[0]), + HPX_FORWARD(F, op))); data_available = true; } return Communicator::template handle_bool>( diff --git a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp index 929aad09f55b..55a3415e704e 100644 --- a/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp +++ b/libs/full/collectives/include/hpx/collectives/exclusive_scan.hpp @@ -187,7 +187,9 @@ namespace hpx::traits { // first value is not taken into account auto it = data.begin(); - hpx::exclusive_scan(it, data.end(), dest.begin(), *it, + hpx::exclusive_scan(it, data.end(), dest.begin(), + Communicator::template handle_bool>( + *it), HPX_FORWARD(F, op)); std::swap(data, dest); diff --git a/libs/full/collectives/include/hpx/collectives/reduce.hpp b/libs/full/collectives/include/hpx/collectives/reduce.hpp index 6b1725954fab..80126f94eb04 100644 --- a/libs/full/collectives/include/hpx/collectives/reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/reduce.hpp @@ -257,19 +257,22 @@ namespace hpx::traits { communication::reduce_tag>::name(), which, generation, // step function (invoked once for get) - [&t](auto& data, std::size_t which) { - data[which] = HPX_FORWARD(T, t); + [&t](auto& data, std::size_t site) { + data[site] = HPX_FORWARD(T, t); }, // finalizer (invoked once after all data has been received) [op = HPX_FORWARD(F, op)]( auto& data, bool&, std::size_t) mutable { HPX_ASSERT(!data.empty()); + if (data.size() > 1) { auto it = data.begin(); return Communicator::template handle_bool< std::decay_t>(hpx::reduce(++it, data.end(), - HPX_MOVE(data[0]), HPX_FORWARD(F, op))); + Communicator::template handle_bool>( + HPX_MOVE(data[0])), + HPX_FORWARD(F, op))); } return Communicator::template handle_bool>( HPX_MOVE(data[0])); @@ -285,8 +288,8 @@ namespace hpx::traits { communication::reduce_tag>::name(), which, generation, // step function (invoked for each set) - [t = HPX_FORWARD(T, t)](auto& data, std::size_t which) mutable { - data[which] = HPX_FORWARD(T, t); + [t = HPX_FORWARD(T, t)](auto& data, std::size_t site) mutable { + data[site] = HPX_FORWARD(T, t); }, // no finalizer nullptr); diff --git a/libs/full/collectives/tests/regressions/CMakeLists.txt b/libs/full/collectives/tests/regressions/CMakeLists.txt index 385afd292e4e..5e13f8f77533 100644 --- a/libs/full/collectives/tests/regressions/CMakeLists.txt +++ b/libs/full/collectives/tests/regressions/CMakeLists.txt @@ -12,6 +12,7 @@ if(HPX_WITH_NETWORKING) broadcast_wait_for_2822 collectives_bool_5940 multiple_gather_ops_2001 + reduce_vector_bool trivially_copyable_all_gather ) @@ -22,6 +23,7 @@ if(HPX_WITH_NETWORKING) set(barrier_3792_PARAMETERS LOCALITIES 3 THREADS_PER_LOCALITY 1) set(collectives_bool_5940_PARAMETERS LOCALITIES 2) set(multiple_gather_ops_2001_PARAMETERS LOCALITIES 2) + set(reduce_vector_bool_2001_PARAMETERS LOCALITIES 2) foreach(test ${tests}) set(sources ${test}.cpp) diff --git a/libs/full/collectives/tests/regressions/reduce_vector_bool.cpp b/libs/full/collectives/tests/regressions/reduce_vector_bool.cpp new file mode 100644 index 000000000000..a03f7b229625 --- /dev/null +++ b/libs/full/collectives/tests/regressions/reduce_vector_bool.cpp @@ -0,0 +1,157 @@ +// Copyright (c) 2019-2024 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if !defined(HPX_COMPUTE_DEVICE_CODE) +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace hpx::collectives; + +constexpr char const* reduce_direct_basename = "/test/reduce_direct/"; +#if defined(HPX_DEBUG) +constexpr int ITERATIONS = 100; +#else +constexpr int ITERATIONS = 1000; +#endif + +void test_multiple_use_with_generation() +{ + std::uint32_t const this_locality = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const reduce_direct_client = + create_communicator(reduce_direct_basename, + num_sites_arg(num_localities), this_site_arg(this_locality)); + + hpx::chrono::high_resolution_timer const t; + + for (int i = 0; i != ITERATIONS; ++i) + { + bool value = ((this_locality + i) % 2) ? true : false; + if (this_locality == 0) + { + hpx::future overall_result = + reduce_here(reduce_direct_client, std::move(value), + std::logical_or<>{}, generation_arg(i + 1)); + + bool sum = false; + for (std::uint32_t j = 0; j != num_localities; ++j) + { + sum = sum || (((j + i) % 2) ? true : false); + } + HPX_TEST_EQ(sum, overall_result.get()); + } + else + { + hpx::future overall_result = reduce_there( + reduce_direct_client, std::move(value), generation_arg(i + 1)); + overall_result.get(); + } + } + + auto const elapsed = t.elapsed(); + if (this_locality == 0) + { + std::cout << "remote timing: " << elapsed / ITERATIONS << "[s]\n"; + } +} + +void test_local_use() +{ + constexpr std::uint32_t num_sites = 10; + + std::vector> sites; + sites.reserve(num_sites); + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([=]() { + auto const reduce_direct_client = + create_communicator(reduce_direct_basename, + num_sites_arg(num_sites), this_site_arg(site)); + + hpx::chrono::high_resolution_timer const t; + + // test functionality based on immediate local result value + for (int i = 0; i != ITERATIONS; ++i) + { + bool value = ((site + i) % 2) ? true : false; + if (site == 0) + { + hpx::future overall_result = reduce_here( + reduce_direct_client, std::move(value), std::logical_or<>{}, + generation_arg(i + 1), this_site_arg(site)); + + bool sum = false; + for (std::uint32_t j = 0; j != num_sites; ++j) + { + sum = sum || (((j + i) % 2) ? true : false); + } + HPX_TEST_EQ(sum, overall_result.get()); + } + else + { + hpx::future overall_result = + reduce_there(reduce_direct_client, std::move(value), + generation_arg(i + 1), this_site_arg(site)); + overall_result.get(); + } + } + + auto const elapsed = t.elapsed(); + if (site == 0) + { + std::cout << "local timing: " << elapsed / (10 * ITERATIONS) + << "[s]\n"; + } + })); + } + + hpx::wait_all(std::move(sites)); +} + +int hpx_main() +{ +#if defined(HPX_HAVE_NETWORKING) + if (hpx::get_num_localities(hpx::launch::sync) > 1) + { + test_multiple_use_with_generation(); + } +#endif + + if (hpx::get_locality_id() == 0) + { + test_local_use(); + } + + return hpx::finalize(); +} + +int main(int argc, char* argv[]) +{ + std::vector const cfg = {"hpx.run_hpx_main!=1"}; + + hpx::init_params init_args; + init_args.cfg = cfg; + + HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0); + return hpx::util::report_errors(); +} + +#endif