From aa829ea4d29d65107639a819b7fb2e376a03225e Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Sun, 5 Jan 2025 16:03:06 -0600 Subject: [PATCH] Adding synchronous collective operations - adding predefined world_comunicator --- .../include/hpx/collectives/broadcast.hpp | 88 ++++- .../hpx/collectives/create_communicator.hpp | 4 + .../include/hpx/collectives/reduce.hpp | 93 +++++ .../collectives/src/create_communicator.cpp | 18 + .../collectives/tests/unit/CMakeLists.txt | 4 +- .../collectives/tests/unit/broadcast_sync.cpp | 302 ++++++++++++++++ .../collectives/tests/unit/reduce_sync.cpp | 326 ++++++++++++++++++ 7 files changed, 833 insertions(+), 2 deletions(-) create mode 100644 libs/full/collectives/tests/unit/broadcast_sync.cpp create mode 100644 libs/full/collectives/tests/unit/reduce_sync.cpp diff --git a/libs/full/collectives/include/hpx/collectives/broadcast.hpp b/libs/full/collectives/include/hpx/collectives/broadcast.hpp index fdde88435c9e..e30dd09dacc2 100644 --- a/libs/full/collectives/include/hpx/collectives/broadcast.hpp +++ b/libs/full/collectives/include/hpx/collectives/broadcast.hpp @@ -200,7 +200,6 @@ namespace hpx { namespace collectives { #include #include #include -#include #include #include #include @@ -334,6 +333,39 @@ namespace hpx::collectives { HPX_FORWARD(T, local_result), this_site); } + //////////////////////////////////////////////////////////////////////////// + template + decltype(auto) broadcast_to(hpx::launch::sync_policy, communicator fid, + T&& local_result, this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + return broadcast_to( + HPX_MOVE(fid), HPX_FORWARD(T, local_result), this_site, generation) + .get(); + } + + template + decltype(auto) broadcast_to(hpx::launch::sync_policy, communicator fid, + T&& local_result, generation_arg generation, + this_site_arg this_site = this_site_arg()) + { + return broadcast_to( + HPX_MOVE(fid), HPX_FORWARD(T, local_result), this_site, generation) + .get(); + } + + template + decltype(auto) broadcast_to(hpx::launch::sync_policy, char const* basename, + T&& local_result, num_sites_arg num_sites = num_sites_arg(), + this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + return broadcast_to(hpx::launch::sync, + create_communicator(basename, num_sites, this_site, generation, + root_site_arg(this_site.argument_)), + HPX_FORWARD(T, local_result), this_site); + } + /////////////////////////////////////////////////////////////////////////// template hpx::future broadcast_from(communicator fid, @@ -392,6 +424,60 @@ namespace hpx::collectives { this_site, generation, root_site), this_site); } + + /////////////////////////////////////////////////////////////////////////// + template + T broadcast_from(hpx::launch::sync_policy, communicator fid, + this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + return broadcast_from(HPX_MOVE(fid), this_site, generation).get(); + } + + template + T broadcast_from(hpx::launch::sync_policy, communicator fid, + generation_arg generation, this_site_arg this_site = this_site_arg()) + { + return broadcast_from(HPX_MOVE(fid), this_site, generation).get(); + } + + template + T broadcast_from(hpx::launch::sync_policy, char const* basename, + this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg(), + root_site_arg root_site = root_site_arg()) + { + HPX_ASSERT(this_site != root_site); + return broadcast_from(create_communicator(basename, num_sites_arg(), + this_site, generation, root_site), + this_site) + .get(); + } + + /////////////////////////////////////////////////////////////////////////// + template + void broadcast(communicator fid, T& value, + this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + if (this_site == static_cast(-1)) + { + this_site = static_cast(agas::get_locality_id()); + } + + fid.wait(); // make sure communicator was created + + if (this_site == fid.get_info().second) + { + broadcast_to( + hpx::launch::sync, HPX_MOVE(fid), value, this_site, generation); + } + else + { + value = broadcast_from( + hpx::launch::sync, HPX_MOVE(fid), this_site, generation); + } + } } // namespace hpx::collectives //////////////////////////////////////////////////////////////////////////////// diff --git a/libs/full/collectives/include/hpx/collectives/create_communicator.hpp b/libs/full/collectives/include/hpx/collectives/create_communicator.hpp index d64b0bd7f255..7aa5837354db 100644 --- a/libs/full/collectives/include/hpx/collectives/create_communicator.hpp +++ b/libs/full/collectives/include/hpx/collectives/create_communicator.hpp @@ -185,6 +185,10 @@ namespace hpx::collectives { } }; + /////////////////////////////////////////////////////////////////////////// + // Predefined global communicator + HPX_EXPORT communicator get_world_communicator(); + /////////////////////////////////////////////////////////////////////////// HPX_EXPORT communicator create_communicator(char const* basename, num_sites_arg num_sites = num_sites_arg(), diff --git a/libs/full/collectives/include/hpx/collectives/reduce.hpp b/libs/full/collectives/include/hpx/collectives/reduce.hpp index 0d17ffdbb86c..0d2d9164e1ba 100644 --- a/libs/full/collectives/include/hpx/collectives/reduce.hpp +++ b/libs/full/collectives/include/hpx/collectives/reduce.hpp @@ -381,6 +381,39 @@ namespace hpx::collectives { HPX_FORWARD(T, result), HPX_FORWARD(F, op), this_site); } + /////////////////////////////////////////////////////////////////////////// + template + decltype(auto) reduce_here(hpx::launch::sync_policy, communicator fid, + T&& local_result, F&& op, this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + return reduce_here(HPX_MOVE(fid), HPX_FORWARD(T, local_result), + HPX_FORWARD(F, op), this_site, generation) + .get(); + } + + template + decltype(auto) reduce_here(hpx::launch::sync_policy, communicator fid, + T&& local_result, F&& op, generation_arg generation, + this_site_arg this_site = this_site_arg()) + { + return reduce_here(HPX_MOVE(fid), HPX_FORWARD(T, local_result), + HPX_FORWARD(F, op), this_site, generation) + .get(); + } + + template + decltype(auto) reduce_here(hpx::launch::sync_policy, char const* basename, + T&& result, F&& op, num_sites_arg num_sites = num_sites_arg(), + this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + return reduce_here(create_communicator(basename, num_sites, this_site, + generation, root_site_arg(this_site.argument_)), + HPX_FORWARD(T, result), HPX_FORWARD(F, op), this_site) + .get(); + } + /////////////////////////////////////////////////////////////////////////// // reduce plain values template @@ -443,6 +476,66 @@ namespace hpx::collectives { this_site, generation, root_site), HPX_FORWARD(T, local_result), this_site); } + + //////////////////////////////////////////////////////////////////////////// + template + void reduce_there(hpx::launch::sync_policy, communicator fid, + T&& local_result, this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + reduce_there( + HPX_MOVE(fid), HPX_FORWARD(T, local_result), this_site, generation) + .get(); + } + + template + void reduce_there(hpx::launch::sync_policy, communicator fid, + T&& local_result, generation_arg generation, + this_site_arg this_site = this_site_arg()) + { + reduce_there( + HPX_MOVE(fid), HPX_FORWARD(T, local_result), this_site, generation) + .get(); + } + + template + void reduce_there(hpx::launch::sync_policy, char const* basename, + T&& local_result, this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg(), + root_site_arg root_site = root_site_arg()) + { + HPX_ASSERT(this_site != root_site); + reduce_there(create_communicator(basename, num_sites_arg(), this_site, + generation, root_site), + HPX_FORWARD(T, local_result), this_site) + .get(); + } + + //////////////////////////////////////////////////////////////////////////// + template + void reduce(communicator fid, T&& local_result, F&& op, + this_site_arg this_site = this_site_arg(), + generation_arg generation = generation_arg()) + { + if (this_site == static_cast(-1)) + { + this_site = static_cast(agas::get_locality_id()); + } + + fid.wait(); // make sure communicator was created + + if (this_site == fid.get_info().second) + { + local_result = reduce_here(hpx::launch::sync, HPX_MOVE(fid), + HPX_FORWARD(T, local_result), HPX_FORWARD(F, op), this_site, + generation); + } + else + { + reduce_there(hpx::launch::sync, HPX_MOVE(fid), + HPX_FORWARD(T, local_result), this_site, generation); + } + } } // namespace hpx::collectives #endif // !HPX_COMPUTE_DEVICE_CODE diff --git a/libs/full/collectives/src/create_communicator.cpp b/libs/full/collectives/src/create_communicator.cpp index cdf70006b2ea..95874e424357 100644 --- a/libs/full/collectives/src/create_communicator.cpp +++ b/libs/full/collectives/src/create_communicator.cpp @@ -200,6 +200,24 @@ namespace hpx::collectives { // find existing communicator return hpx::find_from_basename(HPX_MOVE(name), root_site); } + + /////////////////////////////////////////////////////////////////////////// + // Predefined global communicator + namespace { + communicator world_communicator; + hpx::mutex world_communicator_mtx; + } // namespace + + communicator get_world_communicator() + { + { + std::lock_guard l(world_communicator_mtx); + if (!world_communicator) + world_communicator = + create_communicator("hpx::collectives::world_communicator"); + } + return world_communicator; + } } // namespace hpx::collectives #endif // !HPX_COMPUTE_DEVICE_CODE diff --git a/libs/full/collectives/tests/unit/CMakeLists.txt b/libs/full/collectives/tests/unit/CMakeLists.txt index 6e75b80c4909..b093174ac260 100644 --- a/libs/full/collectives/tests/unit/CMakeLists.txt +++ b/libs/full/collectives/tests/unit/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2024 Hartmut Kaiser +# Copyright (c) 2019-2025 Hartmut Kaiser # # SPDX-License-Identifier: BSL-1.0 # Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -12,6 +12,7 @@ set(tests broadcast broadcast_component broadcast_post + broadcast_sync channel_communicator fold global_spmd_block @@ -28,6 +29,7 @@ if(HPX_WITH_NETWORKING) gather inclusive_scan_ reduce + reduce_sync scatter ) diff --git a/libs/full/collectives/tests/unit/broadcast_sync.cpp b/libs/full/collectives/tests/unit/broadcast_sync.cpp new file mode 100644 index 000000000000..c4989666452a --- /dev/null +++ b/libs/full/collectives/tests/unit/broadcast_sync.cpp @@ -0,0 +1,302 @@ +// Copyright (c) 2020-2025 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if !defined(HPX_COMPUTE_DEVICE_CODE) +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace hpx::collectives; + +constexpr char const* broadcast_direct_basename = "/test/broadcast_direct/"; +#if defined(HPX_DEBUG) +constexpr int ITERATIONS = 100; +#else +constexpr int ITERATIONS = 1000; +#endif + +void test_one_shot_use() +{ + std::uint32_t const here = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + // test functionality based on immediate local result value + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + if (here == 0) + { + std::uint32_t result = + broadcast_to(hpx::launch::sync, broadcast_direct_basename, + i + 42, num_sites_arg(num_localities), this_site_arg(here), + generation_arg(i + 1)); + + HPX_TEST_EQ(i + 42, result); + } + else + { + std::uint32_t result = broadcast_from( + hpx::launch::sync, broadcast_direct_basename, + this_site_arg(here), generation_arg(i + 1)); + + HPX_TEST_EQ(i + 42, result); + } + } +} + +void test_multiple_use() +{ + std::uint32_t const here = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const broadcast_direct_client = + create_communicator(broadcast_direct_basename, + num_sites_arg(num_localities), this_site_arg(here)); + + // test functionality based on immediate local result value + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + if (here == 0) + { + std::uint32_t result = broadcast_to( + hpx::launch::sync, broadcast_direct_client, i + 42); + + HPX_TEST_EQ(i + 42, result); + } + else + { + std::uint32_t result = broadcast_from( + hpx::launch::sync, broadcast_direct_client); + + HPX_TEST_EQ(i + 42, result); + } + } +} + +void test_multiple_use_with_generation() +{ + std::uint32_t const here = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const broadcast_direct_client = + create_communicator(broadcast_direct_basename, + num_sites_arg(num_localities), this_site_arg(here)); + + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + if (here == 0) + { + std::uint32_t result = broadcast_to(hpx::launch::sync, + broadcast_direct_client, i + 42, generation_arg(i + 1)); + + HPX_TEST_EQ(i + 42, result); + } + else + { + std::uint32_t result = + broadcast_from(hpx::launch::sync, + broadcast_direct_client, generation_arg(i + 1)); + + HPX_TEST_EQ(i + 42, result); + } + } + + auto const elapsed = t.elapsed(); + if (here == 0) + { + std::cout << "remote timing: " << elapsed / ITERATIONS << "[s]\n"; + } +} + +void test_multiple_use_with_generation_sync() +{ + std::uint32_t const here = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const broadcast_direct_client = + create_communicator(broadcast_direct_basename, + num_sites_arg(num_localities), this_site_arg(here)); + + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != ITERATIONS; ++i) + { + if (here == 0) + { + std::uint32_t result = i + 42; + broadcast(broadcast_direct_client, result, this_site_arg(), + generation_arg(i + 1)); + + HPX_TEST_EQ(i + 42, result); + } + else + { + std::uint32_t result = -1; + broadcast(broadcast_direct_client, result, this_site_arg(), + generation_arg(i + 1)); + + HPX_TEST_EQ(i + 42, result); + } + } + + auto const elapsed = t.elapsed(); + if (here == 0) + { + std::cout << "remote timing: " << elapsed / ITERATIONS << "[s]\n"; + } +} + +void test_local_use() +{ + constexpr std::uint32_t num_sites = 10; + + std::vector> sites; + sites.reserve(num_sites); + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([=]() { + auto const broadcast_direct_client = + create_communicator(broadcast_direct_basename, + num_sites_arg(num_sites), this_site_arg(site)); + + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + // test functionality based on immediate local result value + if (site == 0) + { + std::uint32_t result = + broadcast_to(hpx::launch::sync, broadcast_direct_client, + 42 + i, this_site_arg(site), generation_arg(i + 1)); + + HPX_TEST_EQ(42 + i, result); + } + else + { + std::uint32_t result = broadcast_from( + hpx::launch::sync, broadcast_direct_client, + this_site_arg(site), generation_arg(i + 1)); + + HPX_TEST_EQ(42 + i, result); + } + } + + auto const elapsed = t.elapsed(); + if (site == 0) + { + std::cout << "local timing: " << elapsed / (10 * ITERATIONS) + << "[s]\n"; + } + })); + } + + hpx::wait_all(std::move(sites)); +} + +void test_local_use_sync() +{ + constexpr std::uint32_t num_sites = 10; + + std::vector> sites; + sites.reserve(num_sites); + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([=]() { + auto const broadcast_direct_client = + create_communicator(broadcast_direct_basename, + num_sites_arg(num_sites), this_site_arg(site)); + + hpx::chrono::high_resolution_timer const t; + + for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + { + // test functionality based on immediate local result value + if (site == 0) + { + std::uint32_t result = 42 + i; + broadcast(broadcast_direct_client, result, + this_site_arg(site), generation_arg(i + 1)); + + HPX_TEST_EQ(42 + i, result); + } + else + { + std::uint32_t result = -1; + broadcast(broadcast_direct_client, result, + this_site_arg(site), generation_arg(i + 1)); + + HPX_TEST_EQ(42 + i, result); + } + } + + auto const elapsed = t.elapsed(); + if (site == 0) + { + std::cout << "local timing: " << elapsed / (10 * ITERATIONS) + << "[s]\n"; + } + })); + } + + hpx::wait_all(std::move(sites)); +} + +int hpx_main() +{ +#if defined(HPX_HAVE_NETWORKING) + if (hpx::get_num_localities(hpx::launch::sync) > 1) + { + test_one_shot_use(); + test_multiple_use(); + test_multiple_use_with_generation(); + test_multiple_use_with_generation_sync(); + } +#endif + + if (hpx::get_locality_id() == 0) + { + test_local_use(); + test_local_use_sync(); + } + + return hpx::finalize(); +} + +int main(int argc, char* argv[]) +{ + std::vector const cfg = {"hpx.run_hpx_main!=1"}; + + hpx::init_params init_args; + init_args.cfg = cfg; + + HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0); + return hpx::util::report_errors(); +} + +#endif diff --git a/libs/full/collectives/tests/unit/reduce_sync.cpp b/libs/full/collectives/tests/unit/reduce_sync.cpp new file mode 100644 index 000000000000..72b99b670d4a --- /dev/null +++ b/libs/full/collectives/tests/unit/reduce_sync.cpp @@ -0,0 +1,326 @@ +// Copyright (c) 2019-2025 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include + +#if !defined(HPX_COMPUTE_DEVICE_CODE) +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace hpx::collectives; + +constexpr char const* reduce_direct_basename = "/test/reduce_direct/"; +#if defined(HPX_DEBUG) +constexpr int ITERATIONS = 100; +#else +constexpr int ITERATIONS = 1000; +#endif + +void test_one_shot_use() +{ + std::uint32_t const this_locality = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + // test functionality based on immediate local result value + for (int i = 0; i != ITERATIONS; ++i) + { + auto value = this_locality + i; + if (this_locality == 0) + { + std::uint32_t overall_result = reduce_here(hpx::launch::sync, + reduce_direct_basename, std::move(value), + std::plus{}, num_sites_arg(num_localities), + this_site_arg(this_locality), generation_arg(i + 1)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_localities; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result); + } + else + { + reduce_there(hpx::launch::sync, reduce_direct_basename, + std::move(value), this_site_arg(this_locality), + generation_arg(i + 1)); + } + } +} + +void test_multiple_use() +{ + std::uint32_t const this_locality = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const reduce_direct_client = + create_communicator(reduce_direct_basename, + num_sites_arg(num_localities), this_site_arg(this_locality)); + + // test functionality based on immediate local result value + for (int i = 0; i != ITERATIONS; ++i) + { + auto value = this_locality + i; + if (this_locality == 0) + { + std::uint32_t overall_result = + reduce_here(hpx::launch::sync, reduce_direct_client, + std::move(value), std::plus{}); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_localities; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result); + } + else + { + reduce_there( + hpx::launch::sync, reduce_direct_client, std::move(value)); + } + } +} + +void test_multiple_use_with_generation() +{ + std::uint32_t const this_locality = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const reduce_direct_client = + create_communicator(reduce_direct_basename, + num_sites_arg(num_localities), this_site_arg(this_locality)); + + hpx::chrono::high_resolution_timer const t; + + for (int i = 0; i != ITERATIONS; ++i) + { + auto value = this_locality + i; + if (this_locality == 0) + { + std::uint32_t overall_result = reduce_here(hpx::launch::sync, + reduce_direct_client, std::move(value), + std::plus{}, generation_arg(i + 1)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_localities; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result); + } + else + { + reduce_there(hpx::launch::sync, reduce_direct_client, + std::move(value), generation_arg(i + 1)); + } + } + + auto const elapsed = t.elapsed(); + if (this_locality == 0) + { + std::cout << "remote timing: " << elapsed / ITERATIONS << "[s]\n"; + } +} + +void test_multiple_use_with_generation_sync() +{ + std::uint32_t const this_locality = hpx::get_locality_id(); + std::uint32_t const num_localities = + hpx::get_num_localities(hpx::launch::sync); + HPX_TEST_LTE(static_cast(2), num_localities); + + auto const reduce_direct_client = + create_communicator(reduce_direct_basename, + num_sites_arg(num_localities), this_site_arg(this_locality)); + + hpx::chrono::high_resolution_timer const t; + + for (int i = 0; i != ITERATIONS; ++i) + { + auto value = this_locality + i; + if (this_locality == 0) + { + std::uint32_t overall_result = value; + reduce(reduce_direct_client, overall_result, std::plus{}, + this_site_arg(), generation_arg(i + 1)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_localities; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result); + } + else + { + reduce(reduce_direct_client, std::move(value), std::plus{}, + this_site_arg(), generation_arg(i + 1)); + } + } + + auto const elapsed = t.elapsed(); + if (this_locality == 0) + { + std::cout << "remote timing: " << elapsed / ITERATIONS << "[s]\n"; + } +} + +void test_local_use() +{ + constexpr std::uint32_t num_sites = 10; + + std::vector> sites; + sites.reserve(num_sites); + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([=]() { + auto const reduce_direct_client = + create_communicator(reduce_direct_basename, + num_sites_arg(num_sites), this_site_arg(site)); + + hpx::chrono::high_resolution_timer const t; + + // test functionality based on immediate local result value + for (int i = 0; i != ITERATIONS; ++i) + { + auto value = site + i; + if (site == 0) + { + std::uint32_t overall_result = + reduce_here(hpx::launch::sync, reduce_direct_client, + std::move(value), std::plus<>{}, + generation_arg(i + 1), this_site_arg(site)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_sites; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result); + } + else + { + reduce_there(hpx::launch::sync, reduce_direct_client, + std::move(value), generation_arg(i + 1), + this_site_arg(site)); + } + } + + auto const elapsed = t.elapsed(); + if (site == 0) + { + std::cout << "local timing: " << elapsed / (10 * ITERATIONS) + << "[s]\n"; + } + })); + } + + hpx::wait_all(std::move(sites)); +} + +void test_local_use_sync() +{ + constexpr std::uint32_t num_sites = 10; + + std::vector> sites; + sites.reserve(num_sites); + + // launch num_sites threads to represent different sites + for (std::uint32_t site = 0; site != num_sites; ++site) + { + sites.push_back(hpx::async([=]() { + auto const reduce_direct_client = + create_communicator(reduce_direct_basename, + num_sites_arg(num_sites), this_site_arg(site)); + + hpx::chrono::high_resolution_timer const t; + + // test functionality based on immediate local result value + for (int i = 0; i != ITERATIONS; ++i) + { + auto value = site + i; + if (site == 0) + { + std::uint32_t overall_result = value; + reduce(reduce_direct_client, overall_result, std::plus{}, + this_site_arg(site), generation_arg(i + 1)); + + std::uint32_t sum = 0; + for (std::uint32_t j = 0; j != num_sites; ++j) + { + sum += j + i; + } + HPX_TEST_EQ(sum, overall_result); + } + else + { + reduce(reduce_direct_client, std::move(value), std::plus{}, + this_site_arg(site), generation_arg(i + 1)); + } + } + + auto const elapsed = t.elapsed(); + if (site == 0) + { + std::cout << "local timing: " << elapsed / (10 * ITERATIONS) + << "[s]\n"; + } + })); + } + + hpx::wait_all(std::move(sites)); +} + +int hpx_main() +{ +#if defined(HPX_HAVE_NETWORKING) + if (hpx::get_num_localities(hpx::launch::sync) > 1) + { + test_one_shot_use(); + test_multiple_use(); + test_multiple_use_with_generation(); + test_multiple_use_with_generation_sync(); + } +#endif + + if (hpx::get_locality_id() == 0) + { + test_local_use(); + test_local_use_sync(); + } + + return hpx::finalize(); +} + +int main(int argc, char* argv[]) +{ + std::vector const cfg = {"hpx.run_hpx_main!=1"}; + + hpx::init_params init_args; + init_args.cfg = cfg; + + HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0); + return hpx::util::report_errors(); +} + +#endif