Skip to content

Commit

Permalink
loop: add range support to parallel_for_each()
Browse files Browse the repository at this point in the history
before this change, we only support containers, whose `begin()` and
`end()` are of the same type, but ranges do not have this requirement.

in order to support range views, let's relax this requirement. so that,
we can pass, for instance, `std::views::iota(1, 10)` to `parallel_for_each()`.
see also b1b0bff.

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
  • Loading branch information
tchaikov committed Oct 5, 2024
1 parent a30ec0c commit eef51db
Showing 1 changed file with 24 additions and 22 deletions.
46 changes: 24 additions & 22 deletions include/seastar/core/loop.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <iterator>
#include <memory>
#include <optional>
#include <ranges>
#include <type_traits>
#include <vector>
#endif
Expand Down Expand Up @@ -381,15 +382,15 @@ future<> keep_doing(AsyncAction action) noexcept {
}

namespace internal {
template <typename Iterator, typename AsyncAction>
template <typename Iterator, class Sentinel, typename AsyncAction>
class do_for_each_state final : public continuation_base<> {
Iterator _begin;
Iterator _end;
Sentinel _end;
AsyncAction _action;
promise<> _pr;

public:
do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
do_for_each_state(Iterator begin, Sentinel end, AsyncAction action, future<>&& first_unavailable)
: _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
internal::set_callback(std::move(first_unavailable), this);
}
Expand Down Expand Up @@ -422,16 +423,16 @@ public:
}
};

template<typename Iterator, typename AsyncAction>
template<typename Iterator, typename Sentinel, typename AsyncAction>
inline
future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
future<> do_for_each_impl(Iterator begin, Sentinel end, AsyncAction action) {
while (begin != end) {
auto f = futurize_invoke(action, *begin++);
if (f.failed()) {
return f;
}
if (!f.available() || need_preempt()) {
auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
auto* s = new internal::do_for_each_state<Iterator, Sentinel, AsyncAction>{
std::move(begin), std::move(end), std::move(action), std::move(f)};
return s->get_future();
}
Expand All @@ -454,12 +455,15 @@ future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
/// when it is acceptable to process the next item.
/// \return a ready future on success, or the first failed future if
/// \c action failed.
template<typename Iterator, typename AsyncAction>
requires requires (Iterator i, AsyncAction aa) {
{ futurize_invoke(aa, *i) } -> std::same_as<future<>>;
}
template<typename Iterator, typename Sentinel, typename AsyncAction>
requires (
requires (Iterator i, AsyncAction aa) {
{ futurize_invoke(aa, *i) } -> std::same_as<future<>>;
} &&
(std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
)
inline
future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
future<> do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept {
try {
return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
} catch (...) {
Expand All @@ -472,21 +476,20 @@ future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept
/// For each item in a range, call a function, waiting for the previous
/// invocation to complete before calling the next one.
///
/// \param c an \c Container object designating input range
/// \param c an \c Range object designating input range
/// \param action a callable, taking a reference to objects from the range
/// as a parameter, and returning a \c future<> that resolves
/// when it is acceptable to process the next item.
/// \return a ready future on success, or the first failed future if
/// \c action failed.
template<typename Container, typename AsyncAction>
requires requires (Container c, AsyncAction aa) {
{ futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
std::end(c);
template<std::ranges::range Range, typename AsyncAction>
requires requires (std::ranges::range_value_t<R> v, AsyncAction aa) {
{ futurize_invoke(aa, v) } -> std::same_as<future<>>;
}
inline
future<> do_for_each(Container& c, AsyncAction action) noexcept {
future<> do_for_each(Range& c, AsyncAction action) noexcept {
try {
return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
return internal::do_for_each_impl(std::ranges::begin(c), std::ranges::end(c), std::move(action));
} catch (...) {
return current_exception_as_future();
}
Expand Down Expand Up @@ -628,10 +631,9 @@ parallel_for_each_impl(Range&& range, Func&& func) {

} // namespace internal

template <typename Range, typename Func>
requires requires (Func f, Range r) {
{ f(*std::begin(r)) } -> std::same_as<future<>>;
std::end(r);
template <std::ranges::range Range, typename Func>
requires requires (Func f, std::ranges::range_value_t<Range> v) {
{ f(v) } -> std::same_as<future<>>;
}
inline
future<>
Expand Down

0 comments on commit eef51db

Please sign in to comment.