From 79f47ed5c50a4574bf25eca259d7d2da3dd5eeea Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sat, 5 Oct 2024 21:49:10 +0800 Subject: [PATCH] loop: add range support to parallel_for_each() before this change, we only support containers, whose `begin()` and `end()` are of the same type, but ranges do not have this requirement. in order to support range views, let's relax this requirement. so that, we can pass, for instance, `std::views::iota(1, 10)` to `parallel_for_each()`. see also b1b0bff1b8b3a94792f7a37b203ae749d54a1d83. Signed-off-by: Kefu Chai --- include/seastar/core/loop.hh | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/include/seastar/core/loop.hh b/include/seastar/core/loop.hh index b37f1d6b1f7..f029f081102 100644 --- a/include/seastar/core/loop.hh +++ b/include/seastar/core/loop.hh @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #endif @@ -381,15 +382,15 @@ future<> keep_doing(AsyncAction action) noexcept { } namespace internal { -template +template class do_for_each_state final : public continuation_base<> { Iterator _begin; - Iterator _end; + Sentinel _end; AsyncAction _action; promise<> _pr; public: - do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable) + do_for_each_state(Iterator begin, Sentinel end, AsyncAction action, future<>&& first_unavailable) : _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) { internal::set_callback(std::move(first_unavailable), this); } @@ -422,16 +423,16 @@ public: } }; -template +template inline -future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) { +future<> do_for_each_impl(Iterator begin, Sentinel end, AsyncAction action) { while (begin != end) { auto f = futurize_invoke(action, *begin++); if (f.failed()) { return f; } if (!f.available() || need_preempt()) { - auto* s = new internal::do_for_each_state{ + auto* s = new internal::do_for_each_state{ std::move(begin), std::move(end), std::move(action), std::move(f)}; return s->get_future(); } @@ -454,12 +455,15 @@ future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) { /// when it is acceptable to process the next item. /// \return a ready future on success, or the first failed future if /// \c action failed. -template -requires requires (Iterator i, AsyncAction aa) { - { futurize_invoke(aa, *i) } -> std::same_as>; -} +template +requires ( + requires (Iterator i, AsyncAction aa) { + { futurize_invoke(aa, *i) } -> std::same_as>; + } && + (std::same_as || std::sentinel_for) +) inline -future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept { +future<> do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept { try { return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action)); } catch (...) { @@ -472,19 +476,19 @@ future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept /// For each item in a range, call a function, waiting for the previous /// invocation to complete before calling the next one. /// -/// \param c an \c Container object designating input range +/// \param c an \c Range object designating input range /// \param action a callable, taking a reference to objects from the range /// as a parameter, and returning a \c future<> that resolves /// when it is acceptable to process the next item. /// \return a ready future on success, or the first failed future if /// \c action failed. -template -requires requires (Container c, AsyncAction aa) { +template +requires requires (Range c, AsyncAction aa) { { futurize_invoke(aa, *std::begin(c)) } -> std::same_as>; std::end(c); } inline -future<> do_for_each(Container& c, AsyncAction action) noexcept { +future<> do_for_each(Range& c, AsyncAction action) noexcept { try { return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action)); } catch (...) {