Skip to content

Commit

Permalink
Fix parallel deterministic reduce and add benchmarks
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyas Atre <shreyasatre16@gmail.com>
  • Loading branch information
SAtacker committed Jan 1, 2025
1 parent b9eca23 commit c205a13
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,71 @@ namespace hpx::parallel::detail {
}
};

template <typename ExPolicy>
struct sequential_reduce_deterministic_rfa_t final
: hpx::functional::detail::tag_fallback<
sequential_reduce_deterministic_rfa_t<ExPolicy>>
{
private:
template <typename InIterB, typename T>
friend constexpr hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T>
tag_fallback_invoke(sequential_reduce_deterministic_rfa_t,
ExPolicy&&, InIterB first, std::size_t partition_size, T init,
std::true_type&&)
{
hpx::parallel::detail::rfa::RFA_bins<T> bins;
bins.initialize_bins();
std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins));

hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<T> rfa;
rfa.set_max_abs_val(init);
rfa.unsafe_add(init);
rfa.renorm();
size_t count = 0;
T max_val = std::abs(*first);
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim <= partition_size;
partition_size_lim++, e++)
{
T temp_max_val = std::abs(static_cast<T>(*e));
if (max_val < temp_max_val)
{
rfa.set_max_abs_val(temp_max_val);
max_val = temp_max_val;
}
rfa.unsafe_add(*e);
count++;
if (count == rfa.endurance())
{
rfa.renorm();
count = 0;
}
}
return rfa;
}

template <typename InIterB, typename T>
friend constexpr T tag_fallback_invoke(
sequential_reduce_deterministic_rfa_t, ExPolicy&&, InIterB first,
std::size_t partition_size, T init, std::false_type&&)
{
hpx::parallel::detail::rfa::RFA_bins<typename T::ftype> bins;
bins.initialize_bins();
std::memcpy(rfa::__rfa_bin_host_buffer__, &bins, sizeof(bins));

T rfa;
rfa += init;
std::size_t partition_size_lim = 0;
for (auto e = first; partition_size_lim <= partition_size;
partition_size_lim++, e++)
{
rfa += (*e);
}
return rfa;
}
};

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_t<ExPolicy>
Expand All @@ -80,4 +145,18 @@ namespace hpx::parallel::detail {
}
#endif

#if !defined(HPX_COMPUTE_DEVICE_CODE)
template <typename ExPolicy>
inline constexpr sequential_reduce_deterministic_rfa_t<ExPolicy>
sequential_reduce_deterministic_rfa =
sequential_reduce_deterministic_rfa_t<ExPolicy>{};
#else
template <typename ExPolicy, typename... Args>
HPX_HOST_DEVICE HPX_FORCEINLINE auto sequential_reduce_deterministic_rfa(
Args&&... args)
{
return sequential_reduce_deterministic_rfa_t<ExPolicy>{}(
std::forward<Args>(args)...);
}
#endif
} // namespace hpx::parallel::detail
Original file line number Diff line number Diff line change
Expand Up @@ -407,35 +407,37 @@ namespace hpx::parallel {
ExPolicy&& policy, FwdIterB first, FwdIterE last, T_&& init,
Reduce&& r)
{
(void)r;
if (first == last)
{
return util::detail::algorithm_result<ExPolicy, T>::get(
HPX_FORWARD(T_, init));
}

auto f1 = [r, policy](
FwdIterB part_begin, std::size_t part_size)
auto f1 = [policy](FwdIterB part_begin, std::size_t part_size)
-> hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T_> {
T val = *part_begin;
T_ val = *part_begin;
return hpx::parallel::detail::
sequential_reduce_deterministic<ExPolicy>(
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), ++part_begin,
--part_size, HPX_MOVE(val), r);
--part_size, HPX_MOVE(val),
std::true_type{});
};

return util::partitioner<ExPolicy, T,
return util::partitioner<ExPolicy, T_,
hpx::parallel::detail::rfa::ReproducibleFloatingAccumulator<
T_>>::call(HPX_FORWARD(ExPolicy, policy), first,
detail::distance(first, last), HPX_MOVE(f1),
hpx::unwrapping([init = HPX_FORWARD(T_, init),
r = HPX_FORWARD(Reduce, r),
policy](auto&& results) -> T {
hpx::unwrapping([policy](auto&& results) -> T_ {
return hpx::parallel::detail::
sequential_reduce_deterministic<ExPolicy>(
sequential_reduce_deterministic_rfa<ExPolicy>(
HPX_FORWARD(ExPolicy, policy),
hpx::util::begin(results),
hpx::util::size(results), init, r)
hpx::util::size(results),
hpx::parallel::detail::rfa::
ReproducibleFloatingAccumulator<T_>{},
std::false_type{})
.conv();
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#include <cstddef>

#if !defined(HPX_COMPUTE_DEVICE_CODE)
#include <hpx/algorithm.hpp>
Expand Down Expand Up @@ -33,15 +34,15 @@ T get_rand(T LO = (std::numeric_limits<T>::min)(),

///////////////////////////////////////////////////////////////////////////////

void bench_reduce_deterministic(
void bench_reduce_deterministic(const auto& policy,
const auto& deterministic_shuffled, const auto& val_det, const auto& op)
{
// check if different type for deterministic and nondeeterministic
// and same result

auto r1_shuffled =
hpx::reduce_deterministic((std::begin(deterministic_shuffled)),
(std::end(deterministic_shuffled)), val_det, op);
hpx::reduce_deterministic(policy, std::begin(deterministic_shuffled),
std::end(deterministic_shuffled), val_det, op);

HPX_UNUSED(r1_shuffled);
}
Expand All @@ -61,6 +62,7 @@ int hpx_main(hpx::program_options::variables_map& vm)
std::srand(seed);

auto test_count = vm["test_count"].as<int>();
std::size_t vector_size = vm["vector-size"].as<std::size_t>();

hpx::util::perftests_init(vm);

Expand All @@ -74,7 +76,7 @@ int hpx_main(hpx::program_options::variables_map& vm)

{
using FloatTypeDeterministic = float;
std::size_t LEN = 10000;
std::size_t LEN = vector_size;

constexpr FloatTypeDeterministic num_bounds_det =
std::is_same_v<FloatTypeDeterministic, float> ? 1000.0 : 1000000.0;
Expand Down Expand Up @@ -113,7 +115,14 @@ int hpx_main(hpx::program_options::variables_map& vm)
{
hpx::util::perftests_report(
"reduce deterministic", "seq", test_count, [&]() {
bench_reduce_deterministic(
bench_reduce_deterministic(hpx::execution::seq,
deterministic_shuffled, val_det, op);
});
}
{
hpx::util::perftests_report(
"reduce deterministic", "par", test_count, [&]() {
bench_reduce_deterministic(hpx::execution::par,
deterministic_shuffled, val_det, op);
});
}
Expand All @@ -135,6 +144,8 @@ int main(int argc, char* argv[])
cmdline.add_options()
("test_count", value<int>()->default_value(100),
"number of tests to be averaged")
("vector-size", value<std::size_t>()->default_value(1000000),
"number of elements to be reduced")
;
// clang-format on

Expand Down

0 comments on commit c205a13

Please sign in to comment.