Skip to content

Commit

Permalink
Always return outermost thread id
Browse files Browse the repository at this point in the history
-flyby: deprecate get_outer_self_id
  • Loading branch information
hkaiser committed Jan 17, 2024
1 parent 7b35448 commit 2aba827
Show file tree
Hide file tree
Showing 36 changed files with 565 additions and 429 deletions.
11 changes: 5 additions & 6 deletions components/iostreams/src/server/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace hpx::iostreams::detail {
ar << valid;
if (valid)
{
ar& data_;
ar & data_;
}
}

Expand All @@ -44,7 +44,7 @@ namespace hpx::iostreams::detail {
ar >> valid;
if (valid)
{
ar& data_;
ar & data_;
}
}
} // namespace hpx::iostreams::detail
Expand Down Expand Up @@ -89,10 +89,9 @@ namespace hpx::iostreams::server {
{ // {{{
// Perform the IO in another OS thread.
detail::buffer in(buf_in);
hpx::get_thread_pool("io_pool")->get_io_service().post(
hpx::bind_front(&output_stream::call_write_sync, this, locality_id,
count, std::ref(in),
threads::thread_id_ref_type(threads::get_outer_self_id())));
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
&output_stream::call_write_sync, this, locality_id, count,
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));

// Sleep until the worker thread wakes us up.
this_thread::suspend(threads::thread_schedule_state::suspended,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -63,6 +63,9 @@ namespace examples::server {
}
~reset_id()
{
auto const mtx = outer_.mtx_;
std::lock_guard<hpx::mutex> l(*mtx);

[[maybe_unused]] hpx::thread::id const old_value = outer_.id_;
outer_.id_ = hpx::thread::id();
HPX_ASSERT(old_value != hpx::thread::id());
Expand Down Expand Up @@ -104,9 +107,15 @@ namespace examples::server {
});

auto const mtx = mtx_;
std::lock_guard<hpx::mutex> l(*mtx);
HPX_ASSERT(id_ != hpx::thread::id());
hpx::thread::interrupt(id_);

std::unique_lock<hpx::mutex> l(*mtx);
auto const id = id_;

if (id != hpx::thread::id())
{
l.unlock();
hpx::thread::interrupt(id);
}
}

HPX_DEFINE_COMPONENT_ACTION(cancelable_action, do_it, do_it_action)
Expand Down
8 changes: 7 additions & 1 deletion libs/core/coroutines/include/hpx/coroutines/thread_enums.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -425,6 +425,12 @@ namespace hpx::threads {
runs_as_child_mode_bits = static_cast<std::uint8_t>(bits);
}

void schedule_hint(std::int16_t core) noexcept
{
mode = thread_schedule_hint_mode::thread;
hint = core;
}

/// The hint associated with the mode. The interpretation of this hint
/// depends on the given mode.
std::int16_t hint = -1;
Expand Down
35 changes: 24 additions & 11 deletions libs/core/execution/tests/unit/bulk_async.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2015 Daniel Bourgeois
// Copyright (c) 2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -16,16 +17,16 @@
#include <vector>

////////////////////////////////////////////////////////////////////////////////
int bulk_test(
hpx::thread::id tid, int value, bool is_par, int passed_through) //-V813
int bulk_test(hpx::thread::id const& tid, int value, bool is_par,
int passed_through) //-V813
{
HPX_TEST_EQ(is_par, (tid != hpx::this_thread::get_id()));
HPX_TEST_EQ(passed_through, 42);
return value;
}

template <typename Executor>
void test_bulk_sync(Executor& exec)
void test_bulk_sync(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -35,14 +36,15 @@ void test_bulk_sync(Executor& exec)
using hpx::placeholders::_1;
using hpx::placeholders::_2;

std::vector<int> results = hpx::parallel::execution::bulk_sync_execute(
exec, hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);
std::vector<int> results =
hpx::parallel::execution::bulk_sync_execute(HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, false, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v)));
}

template <typename Executor>
void test_bulk_async(Executor& exec)
void test_bulk_async(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -54,31 +56,42 @@ void test_bulk_async(Executor& exec)

std::vector<hpx::future<int>> results =
hpx::parallel::execution::bulk_async_execute(
exec, hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);
HPX_FORWARD(Executor, exec),
hpx::bind(&bulk_test, tid, _1, true, _2), v, 42);

HPX_TEST(std::equal(std::begin(results), std::end(results), std::begin(v),
[](hpx::future<int>& lhs, const int& rhs) {
return lhs.get() == rhs;
}));
}

template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

////////////////////////////////////////////////////////////////////////////////
int hpx_main()
{
hpx::execution::sequenced_executor seq_exec;
test_bulk_sync(seq_exec);
test_bulk_sync(disable_run_as_child(seq_exec));

hpx::execution::parallel_executor par_exec;
hpx::execution::parallel_executor par_fork_exec(hpx::launch::fork);
test_bulk_async(par_exec);
test_bulk_async(par_fork_exec);
test_bulk_async(disable_run_as_child(par_exec));
test_bulk_async(disable_run_as_child(par_fork_exec));

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down
71 changes: 39 additions & 32 deletions libs/core/execution/tests/unit/minimal_async_executor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2022 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -23,6 +23,16 @@
#include <vector>

///////////////////////////////////////////////////////////////////////////////
template <typename Executor>
decltype(auto) disable_run_as_child(Executor&& exec)
{
auto hint = hpx::execution::experimental::get_hint(exec);
hint.runs_as_child_mode(hpx::threads::thread_execution_hint::none);

return hpx::experimental::prefer(hpx::execution::experimental::with_hint,
HPX_FORWARD(Executor, exec), hint);
}

hpx::thread::id async_test(int passed_through)
{
HPX_TEST_EQ(passed_through, 42);
Expand All @@ -36,7 +46,8 @@ void apply_test(hpx::latch& l, hpx::thread::id& id, int passed_through)
l.count_down(1);
}

void async_bulk_test(int, hpx::thread::id tid, int passed_through) //-V813
void async_bulk_test(
int, hpx::thread::id const& tid, int passed_through) //-V813
{
HPX_TEST_NEQ(tid, hpx::this_thread::get_id());
HPX_TEST_EQ(passed_through, 42);
Expand All @@ -57,22 +68,22 @@ void test_apply(Executor& exec)
}

template <typename Executor>
void test_sync(Executor& exec)
void test_sync(Executor&& exec)
{
HPX_TEST(hpx::parallel::execution::sync_execute(exec, &async_test, 42) !=
hpx::this_thread::get_id());
}

template <typename Executor>
void test_async(Executor& exec)
void test_async(Executor&& exec)
{
HPX_TEST(
hpx::parallel::execution::async_execute(exec, &async_test, 42).get() !=
hpx::this_thread::get_id());
}

template <typename Executor>
void test_bulk_sync(Executor& exec)
void test_bulk_sync(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand All @@ -89,7 +100,7 @@ void test_bulk_sync(Executor& exec)
}

template <typename Executor>
void test_bulk_async(Executor& exec)
void test_bulk_async(Executor&& exec)
{
hpx::thread::id tid = hpx::this_thread::get_id();

Expand Down Expand Up @@ -128,7 +139,7 @@ void test_executor(std::array<std::size_t, 5> expected)
count_bulk_sync.store(0);
count_bulk_async.store(0);

Executor exec;
auto exec = disable_run_as_child(Executor());

test_apply(exec);
test_sync(exec);
Expand Down Expand Up @@ -158,12 +169,11 @@ struct test_async_executor1
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor1> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor1>
: std::true_type
{
};

struct test_async_executor2 : test_async_executor1
{
Expand All @@ -180,12 +190,11 @@ struct test_async_executor2 : test_async_executor1
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor2> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor2>
: std::true_type
{
};

struct test_async_executor3 : test_async_executor1
{
Expand All @@ -206,12 +215,11 @@ struct test_async_executor3 : test_async_executor1
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor3> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor3>
: std::true_type
{
};

struct test_async_executor4 : test_async_executor1
{
Expand Down Expand Up @@ -257,12 +265,11 @@ struct test_async_executor5 : test_async_executor1
}
};

namespace hpx::parallel::execution {
template <>
struct is_two_way_executor<test_async_executor5> : std::true_type
{
};
} // namespace hpx::parallel::execution
template <>
struct hpx::parallel::execution::is_two_way_executor<test_async_executor5>
: std::true_type
{
};

///////////////////////////////////////////////////////////////////////////////
int hpx_main()
Expand All @@ -278,7 +285,7 @@ int hpx_main()

int main(int argc, char* argv[])
{
// By default this test should run on all available cores
// By default, this test should run on all available cores
std::vector<std::string> const cfg = {"hpx.os_threads=all"};

// Initialize and run HPX
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2019-2020 ETH Zurich
// Copyright (c) 2007-2022 Hartmut Kaiser
// Copyright (c) 2007-2024 Hartmut Kaiser
// Copyright (c) 2019 Agustin Berge
//
// SPDX-License-Identifier: BSL-1.0
Expand Down Expand Up @@ -52,7 +52,7 @@ namespace hpx::parallel::execution::detail {
HPX_ASSERT(pool);

using result_type = std::vector<
hpx::future<typename detail::bulk_function_result_t<F, S, Ts...>>>;
hpx::future<detail::bulk_function_result_t<F, S, Ts...>>>;

result_type results;
std::size_t const size = hpx::util::size(shape);
Expand All @@ -69,9 +69,10 @@ namespace hpx::parallel::execution::detail {
std::size_t const part_end = ((t + 1) * size) / num_threads;
std::size_t const part_size = part_end - part_begin;

auto async_policy = hpx::execution::experimental::with_hint(policy,
threads::thread_schedule_hint{
static_cast<std::int16_t>(first_thread + t)});
auto hint = hpx::execution::experimental::get_hint(policy);
hint.schedule_hint(static_cast<std::int16_t>(first_thread + t));
auto async_policy =
hpx::execution::experimental::with_hint(policy, hint);

if (hierarchical_threshold != 0 &&
part_size > hierarchical_threshold)
Expand Down Expand Up @@ -161,10 +162,13 @@ namespace hpx::parallel::execution::detail {
auto it = std::begin(shape);
for (std::size_t t = 0; t != num_threads; ++t)
{
auto inner_hint =
hpx::execution::experimental::get_hint(policy);
inner_hint.schedule_hint(
static_cast<std::int16_t>(first_thread + t));
auto inner_post_policy =
hpx::execution::experimental::with_hint(policy,
threads::thread_schedule_hint{
static_cast<std::int16_t>(first_thread + t)});
hpx::execution::experimental::with_hint(
policy, inner_hint);

std::size_t const end = ((t + 1) * size) / num_threads;
std::size_t const part_size = end - begin;
Expand Down
Loading

0 comments on commit 2aba827

Please sign in to comment.