diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 7f40fcd30d82..ce7073b4153c 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -106,6 +106,11 @@ bug_fixes: - area: tracers change: | Avoid possible overflow when setting span attributes in Dynatrace sampler. +- area: load_balancing + change: | + Fixed default host weight calculation of :ref:`client_side_weighted_round_robin + ` + to properly handle even number of valid host weights. removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` diff --git a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/BUILD b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/BUILD index e85859736797..1908c9089d37 100644 --- a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/BUILD +++ b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/BUILD @@ -27,6 +27,8 @@ envoy_cc_library( srcs = ["client_side_weighted_round_robin_lb.cc"], hdrs = ["client_side_weighted_round_robin_lb.h"], deps = [ + "//envoy/thread_local:thread_local_interface", + "//source/common/common:callback_impl_lib", "//source/common/orca:orca_load_metrics_lib", "//source/extensions/load_balancing_policies/common:load_balancer_lib", "//source/extensions/load_balancing_policies/round_robin:round_robin_lb_lib", diff --git a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.cc b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.cc index 87226753f089..46f2b35d5d56 100644 --- a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.cc +++ b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.cc @@ -30,8 +30,9 @@ std::string getHostAddress(const Host* host) { } // namespace ClientSideWeightedRoundRobinLbConfig::ClientSideWeightedRoundRobinLbConfig( - const ClientSideWeightedRoundRobinLbProto& lb_proto, Event::Dispatcher& main_thread_dispatcher) - : main_thread_dispatcher_(main_thread_dispatcher) { + const ClientSideWeightedRoundRobinLbProto& lb_proto, Event::Dispatcher& main_thread_dispatcher, + ThreadLocal::SlotAllocator& tls_slot_allocator) + : main_thread_dispatcher_(main_thread_dispatcher), tls_slot_allocator_(tls_slot_allocator) { ENVOY_LOG_MISC(trace, "ClientSideWeightedRoundRobinLbConfig config {}", lb_proto.DebugString()); metric_names_for_computing_utilization = std::vector(lb_proto.metric_names_for_computing_utilization().begin(), @@ -50,12 +51,18 @@ ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb::WorkerLocalLb( Runtime::Loader& runtime, Random::RandomGenerator& random, const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config, const ClientSideWeightedRoundRobinLbConfig& client_side_weighted_round_robin_config, - TimeSource& time_source) + TimeSource& time_source, OptRef tls_shim) : RoundRobinLoadBalancer(priority_set, local_priority_set, stats, runtime, random, common_config, /*round_robin_config=*/std::nullopt, time_source) { orca_load_report_handler_ = std::make_shared(client_side_weighted_round_robin_config, time_source); + if (tls_shim.has_value()) { + apply_weights_cb_handle_ = tls_shim->apply_weights_cb_helper_.add([this](uint32_t priority) { + refresh(priority); + return absl::OkStatus(); + }); + } } HostConstSharedPtr @@ -107,13 +114,17 @@ void ClientSideWeightedRoundRobinLoadBalancer::startWeightUpdatesOnMainThread( void ClientSideWeightedRoundRobinLoadBalancer::updateWeightsOnMainThread() { ENVOY_LOG(trace, "updateWeightsOnMainThread"); for (const HostSetPtr& host_set : priority_set_.hostSetsPerPriority()) { - updateWeightsOnHosts(host_set->hosts()); + if (updateWeightsOnHosts(host_set->hosts())) { + // If weights have changed, then apply them to all workers. + factory_->applyWeightsToAllWorkers(host_set->priority()); + } } } -void ClientSideWeightedRoundRobinLoadBalancer::updateWeightsOnHosts(const HostVector& hosts) { +bool ClientSideWeightedRoundRobinLoadBalancer::updateWeightsOnHosts(const HostVector& hosts) { std::vector weights; HostVector hosts_with_default_weight; + bool weights_updated = false; const MonotonicTime now = time_source_.monotonicTime(); // Weight is considered invalid (too recent) if it was first updated within `blackout_period_`. const MonotonicTime max_non_empty_since = now - blackout_period_; @@ -132,28 +143,48 @@ void ClientSideWeightedRoundRobinLoadBalancer::updateWeightsOnHosts(const HostVe // If `client_side_weight` is valid, then set it as the host weight and store it in // `weights` to calculate median valid weight across all hosts. if (client_side_weight.has_value()) { - weights.push_back(*client_side_weight); - host_ptr->weight(*client_side_weight); - ENVOY_LOG(trace, "updateWeights hostWeight {} = {}", getHostAddress(host_ptr.get()), - host_ptr->weight()); + const uint32_t new_weight = client_side_weight.value(); + weights.push_back(new_weight); + if (new_weight != host_ptr->weight()) { + host_ptr->weight(new_weight); + ENVOY_LOG(trace, "updateWeights hostWeight {} = {}", getHostAddress(host_ptr.get()), + host_ptr->weight()); + weights_updated = true; + } } else { // If `client_side_weight` is invalid, then set host to default (median) weight. hosts_with_default_weight.push_back(host_ptr); } } - // Calculate the default weight as median of all valid weights. - uint32_t default_weight = 1; - if (!weights.empty()) { - auto median_it = weights.begin() + weights.size() / 2; - std::nth_element(weights.begin(), median_it, weights.end()); - default_weight = *median_it; - } - // Update the hosts with default weight. - for (const auto& host_ptr : hosts_with_default_weight) { - host_ptr->weight(default_weight); - ENVOY_LOG(trace, "updateWeights default hostWeight {} = {}", getHostAddress(host_ptr.get()), - host_ptr->weight()); + // If some hosts don't have valid weight, then update them with default weight. + if (!hosts_with_default_weight.empty()) { + // Calculate the default weight as median of all valid weights. + uint32_t default_weight = 1; + if (!weights.empty()) { + const auto median_it = weights.begin() + weights.size() / 2; + std::nth_element(weights.begin(), median_it, weights.end()); + if (weights.size() % 2 == 1) { + default_weight = *median_it; + } else { + // If the number of weights is even, then the median is the average of the two middle + // elements. + const auto lower_median_it = std::max_element(weights.begin(), median_it); + // Use uint64_t to avoid potential overflow of the weights sum. + default_weight = static_cast( + (static_cast(*lower_median_it) + static_cast(*median_it)) / 2); + } + } + // Update the hosts with default weight. + for (const auto& host_ptr : hosts_with_default_weight) { + if (default_weight != host_ptr->weight()) { + host_ptr->weight(default_weight); + ENVOY_LOG(trace, "updateWeights default hostWeight {} = {}", getHostAddress(host_ptr.get()), + host_ptr->weight()); + weights_updated = true; + } + } } + return weights_updated; } void ClientSideWeightedRoundRobinLoadBalancer::addClientSideLbPolicyDataToHosts( @@ -246,7 +277,16 @@ Upstream::LoadBalancerPtr ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalL ASSERT(typed_lb_config != nullptr); return std::make_unique( params.priority_set, params.local_priority_set, cluster_info_.lbStats(), runtime_, random_, - cluster_info_.lbConfig(), *typed_lb_config, time_source_); + cluster_info_.lbConfig(), *typed_lb_config, time_source_, tls_->get()); +} + +void ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLbFactory::applyWeightsToAllWorkers( + uint32_t priority) { + tls_->runOnAllThreads([priority](OptRef tls_shim) -> void { + if (tls_shim.has_value()) { + auto status = tls_shim->apply_weights_cb_helper_.runCallbacks(priority); + } + }); } ClientSideWeightedRoundRobinLoadBalancer::ClientSideWeightedRoundRobinLoadBalancer( @@ -265,8 +305,9 @@ absl::Status ClientSideWeightedRoundRobinLoadBalancer::initialize() { } // Setup a callback to receive priority set updates. priority_update_cb_ = priority_set_.addPriorityUpdateCb( - [](uint32_t, const HostVector& hosts_added, const HostVector&) -> absl::Status { + [this](uint32_t, const HostVector& hosts_added, const HostVector&) -> absl::Status { addClientSideLbPolicyDataToHosts(hosts_added); + updateWeightsOnMainThread(); return absl::OkStatus(); }); diff --git a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.h b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.h index ddfa06cdf143..ec65d7d5f3b0 100644 --- a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.h +++ b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb.h @@ -1,8 +1,11 @@ #pragma once #include "envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3/client_side_weighted_round_robin.pb.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/thread_local/thread_local_object.h" #include "envoy/upstream/upstream.h" +#include "source/common/common/callback_impl.h" #include "source/extensions/load_balancing_policies/common/load_balancer_impl.h" #include "source/extensions/load_balancing_policies/round_robin/round_robin_lb.h" @@ -21,7 +24,8 @@ using OrcaLoadReportProto = xds::data::orca::v3::OrcaLoadReport; class ClientSideWeightedRoundRobinLbConfig : public Upstream::LoadBalancerConfig { public: ClientSideWeightedRoundRobinLbConfig(const ClientSideWeightedRoundRobinLbProto& lb_proto, - Event::Dispatcher& main_thread_dispatcher); + Event::Dispatcher& main_thread_dispatcher, + ThreadLocal::SlotAllocator& tls_slot_allocator); // Parameters for weight calculation from Orca Load report. std::vector metric_names_for_computing_utilization; @@ -32,6 +36,7 @@ class ClientSideWeightedRoundRobinLbConfig : public Upstream::LoadBalancerConfig std::chrono::milliseconds weight_update_period; Event::Dispatcher& main_thread_dispatcher_; + ThreadLocal::SlotAllocator& tls_slot_allocator_; }; /** @@ -131,6 +136,12 @@ class ClientSideWeightedRoundRobinLoadBalancer : public Upstream::ThreadAwareLoa TimeSource& time_source_; }; + // Thread local shim to store callbacks for weight updates of worker local lb. + class ThreadLocalShim : public Envoy::ThreadLocal::ThreadLocalObject { + public: + Common::CallbackManager apply_weights_cb_helper_; + }; + // This class is used to handle the load balancing on the worker thread. class WorkerLocalLb : public RoundRobinLoadBalancer { public: @@ -139,15 +150,15 @@ class ClientSideWeightedRoundRobinLoadBalancer : public Upstream::ThreadAwareLoa ClusterLbStats& stats, Runtime::Loader& runtime, Random::RandomGenerator& random, const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config, const ClientSideWeightedRoundRobinLbConfig& client_side_weighted_round_robin_config, - TimeSource& time_source); + TimeSource& time_source, OptRef tls_shim); private: friend class ClientSideWeightedRoundRobinLoadBalancerFriend; HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; - bool alwaysUseEdfScheduler() const override { return true; }; std::shared_ptr orca_load_report_handler_; + Common::CallbackHandlePtr apply_weights_cb_handle_; }; // Factory used to create worker-local load balancer on the worker thread. @@ -158,14 +169,24 @@ class ClientSideWeightedRoundRobinLoadBalancer : public Upstream::ThreadAwareLoa const Upstream::PrioritySet& priority_set, Runtime::Loader& runtime, Envoy::Random::RandomGenerator& random, TimeSource& time_source) : lb_config_(lb_config), cluster_info_(cluster_info), priority_set_(priority_set), - runtime_(runtime), random_(random), time_source_(time_source) {} + runtime_(runtime), random_(random), time_source_(time_source) { + const auto* typed_lb_config = + dynamic_cast(lb_config.ptr()); + ASSERT(typed_lb_config != nullptr); + tls_ = + ThreadLocal::TypedSlot::makeUnique(typed_lb_config->tls_slot_allocator_); + tls_->set([](Envoy::Event::Dispatcher&) { return std::make_shared(); }); + } Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams params) override; bool recreateOnHostChange() const override { return false; } + void applyWeightsToAllWorkers(uint32_t priority); + protected: OptRef lb_config_; + std::unique_ptr> tls_; const Upstream::ClusterInfo& cluster_info_; const Upstream::PrioritySet& priority_set_; @@ -200,7 +221,8 @@ class ClientSideWeightedRoundRobinLoadBalancer : public Upstream::ThreadAwareLoa void updateWeightsOnMainThread(); // Update weights using client side host LB policy data for all `hosts`. - void updateWeightsOnHosts(const HostVector& hosts); + // Returns true if any host weight is updated. + bool updateWeightsOnHosts(const HostVector& hosts); // Add client side host LB policy data to all `hosts`. static void addClientSideLbPolicyDataToHosts(const HostVector& hosts); diff --git a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/config.h b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/config.h index 6222714dcaee..6cb8a1bcb99d 100644 --- a/source/extensions/load_balancing_policies/client_side_weighted_round_robin/config.h +++ b/source/extensions/load_balancing_policies/client_side_weighted_round_robin/config.h @@ -16,7 +16,6 @@ namespace ClientSideWeightedRoundRobin { using ClientSideWeightedRoundRobinLbProto = envoy::extensions::load_balancing_policies:: client_side_weighted_round_robin::v3::ClientSideWeightedRoundRobin; -// using ClusterProto = envoy::config::cluster::v3::Cluster; class Factory : public Upstream::TypedLoadBalancerFactoryBase { public: @@ -38,7 +37,7 @@ class Factory : public Upstream::TypedLoadBalancerFactoryBase(config); return Upstream::LoadBalancerConfigPtr{new Upstream::ClientSideWeightedRoundRobinLbConfig( - lb_config, context.mainThreadDispatcher())}; + lb_config, context.mainThreadDispatcher(), context.threadLocal())}; } }; diff --git a/source/extensions/load_balancing_policies/common/load_balancer_impl.cc b/source/extensions/load_balancing_policies/common/load_balancer_impl.cc index 8a04de4c8b52..847e29084dcf 100644 --- a/source/extensions/load_balancing_policies/common/load_balancer_impl.cc +++ b/source/extensions/load_balancing_policies/common/load_balancer_impl.cc @@ -917,7 +917,7 @@ void EdfLoadBalancerBase::refresh(uint32_t priority) { // case EDF creation is skipped. When all original weights are equal and no hosts are in slow // start mode we can rely on unweighted host pick to do optimal round robin and least-loaded // host selection with lower memory and CPU overhead. - if (!alwaysUseEdfScheduler() && hostWeightsAreEqual(hosts) && noHostsAreInSlowStart()) { + if (hostWeightsAreEqual(hosts) && noHostsAreInSlowStart()) { // Skip edf creation. return; } @@ -963,8 +963,6 @@ void EdfLoadBalancerBase::refresh(uint32_t priority) { } } -bool EdfLoadBalancerBase::alwaysUseEdfScheduler() const { return false; } - bool EdfLoadBalancerBase::isSlowStartEnabled() const { return slow_start_window_ > std::chrono::milliseconds(0); } diff --git a/source/extensions/load_balancing_policies/common/load_balancer_impl.h b/source/extensions/load_balancing_policies/common/load_balancer_impl.h index 7dce8d2b9414..d01e3f551203 100644 --- a/source/extensions/load_balancing_policies/common/load_balancer_impl.h +++ b/source/extensions/load_balancing_policies/common/load_balancer_impl.h @@ -492,10 +492,6 @@ class EdfLoadBalancerBase : public ZoneAwareLoadBalancerBase { virtual void refresh(uint32_t priority); - // Return `true` if refresh() should always use EDF scheduler, even if host - // weights are all equal. Default to `false`. - virtual bool alwaysUseEdfScheduler() const; - bool isSlowStartEnabled() const; bool noHostsAreInSlowStart() const; diff --git a/test/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb_test.cc b/test/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb_test.cc index f57aa3f1135f..70951d5973ba 100644 --- a/test/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb_test.cc +++ b/test/extensions/load_balancing_policies/client_side_weighted_round_robin/client_side_weighted_round_robin_lb_test.cc @@ -121,12 +121,13 @@ class ClientSideWeightedRoundRobinLoadBalancerTest : public LoadBalancerTestBase client_side_weighted_round_robin_config_.mutable_metric_names_for_computing_utilization()->Add( "metric2"); + EXPECT_CALL(mock_tls_, allocateSlot()); lb_ = std::make_shared( std::make_shared( lb_config_, cluster_info_, priority_set_, runtime_, random_, simTime()), std::make_shared( priority_set_, local_priority_set_.get(), stats_, runtime_, random_, common_config_, - lb_config_, simTime())); + lb_config_, simTime(), /*tls_shim=*/absl::nullopt)); // Initialize the thread aware load balancer from config. ASSERT_EQ(lb_->initialize(), absl::OkStatus()); @@ -161,9 +162,10 @@ class ClientSideWeightedRoundRobinLoadBalancerTest : public LoadBalancerTestBase NiceMock lb_context_; NiceMock dispatcher_; + NiceMock mock_tls_; NiceMock cluster_info_; - ClientSideWeightedRoundRobinLbConfig lb_config_ = - ClientSideWeightedRoundRobinLbConfig(client_side_weighted_round_robin_config_, dispatcher_); + ClientSideWeightedRoundRobinLbConfig lb_config_ = ClientSideWeightedRoundRobinLbConfig( + client_side_weighted_round_robin_config_, dispatcher_, mock_tls_); }; ////////////////////////////////////////////////////// @@ -216,7 +218,7 @@ TEST_P(ClientSideWeightedRoundRobinLoadBalancerTest, UpdateWeightsOneHostHasClie EXPECT_EQ(hosts[2]->weight(), 42); } -TEST_P(ClientSideWeightedRoundRobinLoadBalancerTest, UpdateWeightsDefaultIsMedianWeight) { +TEST_P(ClientSideWeightedRoundRobinLoadBalancerTest, UpdateWeightsDefaultIsOddMedianWeight) { init(false); HostVector hosts = { makeTestHost(info_, "tcp://127.0.0.1:80", simTime()), @@ -247,6 +249,36 @@ TEST_P(ClientSideWeightedRoundRobinLoadBalancerTest, UpdateWeightsDefaultIsMedia EXPECT_EQ(hosts[4]->weight(), 42); } +TEST_P(ClientSideWeightedRoundRobinLoadBalancerTest, UpdateWeightsDefaultIsEvenMedianWeight) { + init(false); + HostVector hosts = { + makeTestHost(info_, "tcp://127.0.0.1:80", simTime()), + makeTestHost(info_, "tcp://127.0.0.1:81", simTime()), + makeTestHost(info_, "tcp://127.0.0.1:82", simTime()), + makeTestHost(info_, "tcp://127.0.0.1:83", simTime()), + makeTestHost(info_, "tcp://127.0.0.1:84", simTime()), + }; + simTime().setMonotonicTime(MonotonicTime(std::chrono::seconds(30))); + // Set client side weight for first two hosts. + setHostClientSideWeight(hosts[0], 5, 5, 10); + setHostClientSideWeight(hosts[1], 42, 5, 10); + // Setting client side weights should not change the host weights. + EXPECT_EQ(hosts[0]->weight(), 1); + EXPECT_EQ(hosts[1]->weight(), 1); + EXPECT_EQ(hosts[2]->weight(), 1); + EXPECT_EQ(hosts[3]->weight(), 1); + EXPECT_EQ(hosts[4]->weight(), 1); + // Update weights on hosts. + lb_->updateWeightsOnHosts(hosts); + // First two hosts have client side weight, other hosts get the median + // weight which is average of weights of first two hosts. + EXPECT_EQ(hosts[0]->weight(), 5); + EXPECT_EQ(hosts[1]->weight(), 42); + EXPECT_EQ(hosts[2]->weight(), 23); + EXPECT_EQ(hosts[3]->weight(), 23); + EXPECT_EQ(hosts[4]->weight(), 23); +} + TEST_P(ClientSideWeightedRoundRobinLoadBalancerTest, ChooseHostWithClientSideWeights) { if (&hostSet() == &failover_host_set_) { // P = 1 does not support zone-aware routing. return; diff --git a/test/extensions/load_balancing_policies/client_side_weighted_round_robin/integration_test.cc b/test/extensions/load_balancing_policies/client_side_weighted_round_robin/integration_test.cc index 94c5c040509f..98da83e1c200 100644 --- a/test/extensions/load_balancing_policies/client_side_weighted_round_robin/integration_test.cc +++ b/test/extensions/load_balancing_policies/client_side_weighted_round_robin/integration_test.cc @@ -18,12 +18,35 @@ namespace LoadBalancingPolices { namespace ClientSideWeightedRoundRobin { namespace { +void configureClusterLoadBalancingPolicy(envoy::config::cluster::v3::Cluster& cluster) { + auto* policy = cluster.mutable_load_balancing_policy(); + + // Configure LB policy with short blackout period, long expiration period, + // and short update period. + const std::string policy_yaml = R"EOF( + policies: + - typed_extension_config: + name: envoy.load_balancing_policies.client_side_weighted_round_robin + typed_config: + "@type": type.googleapis.com/envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin + blackout_period: + seconds: 1 + weight_expiration_period: + seconds: 180 + weight_update_period: + seconds: 1 + )EOF"; + + TestUtility::loadFromYaml(policy_yaml, *policy); +} + class ClientSideWeightedRoundRobinIntegrationTest - : public testing::TestWithParam, + : public testing::TestWithParam>, public HttpIntegrationTest { public: ClientSideWeightedRoundRobinIntegrationTest() - : HttpIntegrationTest(Http::CodecType::HTTP1, GetParam()) { + : HttpIntegrationTest(Http::CodecType::HTTP1, std::get<0>(GetParam())) { + qps_multiplier_ = std::get<1>(GetParam()); // Create 3 different upstream server for stateful session test. setUpstreamCount(3); } @@ -53,29 +76,12 @@ class ClientSideWeightedRoundRobinIntegrationTest port_value: 0 )EOF"; - const std::string local_address = Network::Test::getLoopbackAddressString(GetParam()); + const std::string local_address = + Network::Test::getLoopbackAddressString(std::get<0>(GetParam())); TestUtility::loadFromYaml( fmt::format(endpoints_yaml, local_address, local_address, local_address), *endpoint); - auto* policy = cluster_0->mutable_load_balancing_policy(); - - // Configure LB policy with short blackout period, long expiration period, - // and short update period. - const std::string policy_yaml = R"EOF( - policies: - - typed_extension_config: - name: envoy.load_balancing_policies.client_side_weighted_round_robin - typed_config: - "@type": type.googleapis.com/envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin - blackout_period: - seconds: 1 - weight_expiration_period: - seconds: 180 - weight_update_period: - seconds: 1 - )EOF"; - - TestUtility::loadFromYaml(policy_yaml, *policy); + configureClusterLoadBalancingPolicy(*cluster_0); }); HttpIntegrationTest::initialize(); @@ -119,7 +125,7 @@ class ClientSideWeightedRoundRobinIntegrationTest // weights will be different. upstream_request_->encodeHeaders( responseHeadersWithLoadReport(upstream_index.value(), 0.5, - 1 * (upstream_index.value() + 1)), + qps_multiplier_ * (upstream_index.value() + 1)), true); ASSERT_TRUE(response->waitForEndStream()); @@ -154,17 +160,251 @@ class ClientSideWeightedRoundRobinIntegrationTest EXPECT_LT(weighted_usage[0], weighted_usage[1]); EXPECT_LT(weighted_usage[1], weighted_usage[2]); } + + int qps_multiplier_ = 1; }; -INSTANTIATE_TEST_SUITE_P(IpVersions, ClientSideWeightedRoundRobinIntegrationTest, - testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), - TestUtility::ipTestParamsToString); +INSTANTIATE_TEST_SUITE_P( + IpVersions, ClientSideWeightedRoundRobinIntegrationTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + testing::ValuesIn({1, 100, 10000}))); TEST_P(ClientSideWeightedRoundRobinIntegrationTest, NormalLoadBalancing) { initializeConfig(); runNormalLoadBalancing(); } +// Tests to verify the behavior of load balancing policy when cluster is added, +// removed, and added again. +class ClientSideWeightedRoundRobinXdsIntegrationTest + : public testing::TestWithParam>, + public HttpIntegrationTest { +public: + ClientSideWeightedRoundRobinXdsIntegrationTest() + : HttpIntegrationTest(Http::CodecType::HTTP1, std::get<0>(GetParam()), config()), + deferred_cluster_creation_(std::get<1>(GetParam())) { + use_lds_ = false; + } + + void TearDown() override { cleanUpXdsConnection(); } + + void initialize() override { + use_lds_ = false; + setUpstreamCount(2); // the CDS cluster + setUpstreamProtocol(Http::CodecType::HTTP2); // CDS uses gRPC uses HTTP2. + + defer_listener_finalization_ = true; + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + bootstrap.mutable_cluster_manager()->set_enable_deferred_cluster_creation( + deferred_cluster_creation_); + }); + HttpIntegrationTest::initialize(); + + addFakeUpstream(Http::CodecType::HTTP2); + addFakeUpstream(Http::CodecType::HTTP2); + cluster1_ = ConfigHelper::buildStaticCluster( + FirstClusterName, fake_upstreams_[FirstUpstreamIndex]->localAddress()->ip()->port(), + Network::Test::getLoopbackAddressString(version_)); + configureClusterLoadBalancingPolicy(cluster1_); + + cluster2_ = ConfigHelper::buildStaticCluster( + SecondClusterName, fake_upstreams_[SecondUpstreamIndex]->localAddress()->ip()->port(), + Network::Test::getLoopbackAddressString(version_)); + configureClusterLoadBalancingPolicy(cluster2_); + + // Let Envoy establish its connection to the CDS server. + acceptXdsConnection(); + + // Do the initial compareDiscoveryRequest / sendDiscoveryResponse for + // cluster_1. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "55"); + + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + + // Wait for our statically specified listener to become ready, and register + // its port in the test framework's downstream listener port map. + test_server_->waitUntilListenersReady(); + registerTestServerPorts({"http"}); + } + + void acceptXdsConnection() { + // xds_connection_ is filled with the new FakeHttpConnection. + AssertionResult result = + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + } + + const char* FirstClusterName = "cluster_1"; + const char* SecondClusterName = "cluster_2"; + // Index in fake_upstreams_ + const int FirstUpstreamIndex = 2; + const int SecondUpstreamIndex = 3; + + const std::string& config() { + CONSTRUCT_ON_FIRST_USE(std::string, fmt::format(R"EOF( +admin: + access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "{}" + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +dynamic_resources: + cds_config: + api_config_source: + api_type: GRPC + grpc_services: + envoy_grpc: + cluster_name: my_cds_cluster + set_node_on_first_message_only: true +static_resources: + clusters: + - name: my_cds_cluster + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {{}} + load_assignment: + cluster_name: my_cds_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + listeners: + - name: http + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + filters: + name: http + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: config_test + http_filters: + name: envoy.filters.http.router + codec_type: HTTP1 + route_config: + name: route_config_0 + validate_clusters: false + virtual_hosts: + name: integration + routes: + - route: + cluster: cluster_1 + match: + prefix: "/cluster1" + - route: + cluster: cluster_2 + match: + prefix: "/cluster2" + domains: "*" +)EOF", + Platform::null_device_path)); + } + + const bool deferred_cluster_creation_; + envoy::config::cluster::v3::Cluster cluster1_; + envoy::config::cluster::v3::Cluster cluster2_; +}; + +TEST_P(ClientSideWeightedRoundRobinXdsIntegrationTest, ClusterUpDownUp) { + // Calls our initialize(), which includes establishing a listener, route, and + // cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/cluster1"); + cleanupUpstreamAndDownstream(); + ASSERT_TRUE(codec_client_->waitForDisconnect()); + + // Tell Envoy that cluster_1 is gone. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {}, {}, + {FirstClusterName}, "42"); + // We can continue the test once we're sure that Envoy's ClusterManager has + // made use of the DiscoveryResponse that says cluster_1 is gone. + test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); + + // Now that cluster_1 is gone, the listener (with its routing to cluster_1) + // should 503. + BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( + lookupPort("http"), "GET", "/cluster1", "", downstream_protocol_, version_, "foo.com"); + ASSERT_TRUE(response->complete()); + EXPECT_EQ("503", response->headers().getStatusValue()); + + cleanupUpstreamAndDownstream(); + ASSERT_TRUE(codec_client_->waitForDisconnect()); + + // Tell Envoy that cluster_1 is back. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "42", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster1_}, {cluster1_}, {}, "413"); + + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); + testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/cluster1"); + + cleanupUpstreamAndDownstream(); + + // runNormalLoadBalancing(); +} + +// Tests adding a cluster, adding another, then removing and adding back the first. +TEST_P(ClientSideWeightedRoundRobinXdsIntegrationTest, TwoClusters) { + // Calls our initialize(), which includes establishing a listener, route, and + // cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/cluster1"); + cleanupUpstreamAndDownstream(); + ASSERT_TRUE(codec_client_->waitForDisconnect()); + + // Tell Envoy that cluster_2 is here. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster2_}, {}, "42"); + // Wait for the cluster to be active (two upstream clusters plus the CDS + // cluster). + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + + // A request for the second cluster should be fine. + testRouterHeaderOnlyRequestAndResponse(nullptr, SecondUpstreamIndex, "/cluster2"); + cleanupUpstreamAndDownstream(); + ASSERT_TRUE(codec_client_->waitForDisconnect()); + + // Tell Envoy that cluster_1 is gone. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "42", {}, {}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {cluster2_}, {}, {FirstClusterName}, "43"); + // We can continue the test once we're sure that Envoy's ClusterManager has + // made use of the DiscoveryResponse that says cluster_1 is gone. + test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); + + testRouterHeaderOnlyRequestAndResponse(nullptr, SecondUpstreamIndex, "/cluster2"); + cleanupUpstreamAndDownstream(); + ASSERT_TRUE(codec_client_->waitForDisconnect()); + + // Tell Envoy that cluster_1 is back. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "43", {}, {}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster1_}, {}, "413"); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + testRouterHeaderOnlyRequestAndResponse(nullptr, FirstUpstreamIndex, "/cluster1"); + cleanupUpstreamAndDownstream(); +} + +INSTANTIATE_TEST_SUITE_P( + IpVersions, ClientSideWeightedRoundRobinXdsIntegrationTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool())); + } // namespace } // namespace ClientSideWeightedRoundRobin } // namespace LoadBalancingPolices