Skip to content

Commit

Permalink
grpc: removing exceptions from client creation (#37765)
Browse files Browse the repository at this point in the history
Risk Level: low
Testing: updated tests
Docs Changes: n/a
Release Notes: n/a
envoyproxy/envoy-mobile#176

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored Jan 9, 2025
1 parent a7a8414 commit a0c96b3
Show file tree
Hide file tree
Showing 21 changed files with 113 additions and 56 deletions.
4 changes: 2 additions & 2 deletions envoy/grpc/async_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ class AsyncClientFactory {
* exclusively. For example, some filters pass *this reference to raw client. In this case, the
* client must be destroyed before the filter instance. In this case, the grpc client must be
* owned by the filter instance exclusively.
* @return RawAsyncClientPtr async client.
* @return RawAsyncClientPtr async client or an error status.
*/
virtual RawAsyncClientPtr createUncachedRawAsyncClient() PURE;
virtual absl::StatusOr<RawAsyncClientPtr> createUncachedRawAsyncClient() PURE;

private:
friend class AsyncClientFactoryImpl;
Expand Down
26 changes: 19 additions & 7 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,37 @@
namespace Envoy {
namespace Grpc {

absl::StatusOr<std::unique_ptr<AsyncClientImpl>>
AsyncClientImpl::create(Upstream::ClusterManager& cm,
const envoy::config::core::v3::GrpcService& config,
TimeSource& time_source) {
absl::Status creation_status = absl::OkStatus();
auto ret = std::unique_ptr<AsyncClientImpl>(
new AsyncClientImpl(cm, config, time_source, creation_status));
RETURN_IF_NOT_OK(creation_status);
return ret;
}

AsyncClientImpl::AsyncClientImpl(Upstream::ClusterManager& cm,
const envoy::config::core::v3::GrpcService& config,
TimeSource& time_source)
TimeSource& time_source, absl::Status& creation_status)
: max_recv_message_length_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.envoy_grpc(), max_receive_message_length, 0)),
skip_envoy_headers_(config.envoy_grpc().skip_envoy_headers()), cm_(cm),
remote_cluster_name_(config.envoy_grpc().cluster_name()),
host_name_(config.envoy_grpc().authority()), time_source_(time_source),
metadata_parser_(THROW_OR_RETURN_VALUE(
Router::HeaderParser::configure(
config.initial_metadata(),
envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD),
Router::HeaderParserPtr)),
retry_policy_(
config.has_retry_policy()
? absl::optional<envoy::config::route::v3::
RetryPolicy>{Http::Utility::convertCoreToRouteRetryPolicy(
config.retry_policy(), "")}
: absl::nullopt) {}
: absl::nullopt) {
auto parser_or_error = Router::HeaderParser::configure(
config.initial_metadata(),
envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD);
SET_AND_RETURN_IF_NOT_OK(parser_or_error.status(), creation_status);
metadata_parser_ = std::move(*parser_or_error);
}

AsyncClientImpl::~AsyncClientImpl() {
ASSERT(isThreadSafe());
Expand Down
9 changes: 7 additions & 2 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ using AsyncStreamImplPtr = std::unique_ptr<AsyncStreamImpl>;

class AsyncClientImpl final : public RawAsyncClient {
public:
AsyncClientImpl(Upstream::ClusterManager& cm, const envoy::config::core::v3::GrpcService& config,
TimeSource& time_source);
static absl::StatusOr<std::unique_ptr<AsyncClientImpl>>
create(Upstream::ClusterManager& cm, const envoy::config::core::v3::GrpcService& config,
TimeSource& time_source);
~AsyncClientImpl() override;

// Grpc::AsyncClient
Expand All @@ -42,6 +43,10 @@ class AsyncClientImpl final : public RawAsyncClient {
return retry_policy_;
}

protected:
AsyncClientImpl(Upstream::ClusterManager& cm, const envoy::config::core::v3::GrpcService& config,
TimeSource& time_source, absl::Status& creation_status);

private:
const uint32_t max_recv_message_length_;
const bool skip_envoy_headers_;
Expand Down
14 changes: 9 additions & 5 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ AsyncClientManagerImpl::AsyncClientManagerImpl(
#endif
}

RawAsyncClientPtr AsyncClientFactoryImpl::createUncachedRawAsyncClient() {
return std::make_unique<AsyncClientImpl>(cm_, config_, time_source_);
absl::StatusOr<RawAsyncClientPtr> AsyncClientFactoryImpl::createUncachedRawAsyncClient() {
return AsyncClientImpl::create(cm_, config_, time_source_);
}

GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
Expand Down Expand Up @@ -123,7 +123,7 @@ GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
}
}

RawAsyncClientPtr GoogleAsyncClientFactoryImpl::createUncachedRawAsyncClient() {
absl::StatusOr<RawAsyncClientPtr> GoogleAsyncClientFactoryImpl::createUncachedRawAsyncClient() {
#ifdef ENVOY_GOOGLE_GRPC
GoogleGenericStubFactory stub_factory;
return std::make_unique<GoogleAsyncClientImpl>(
Expand Down Expand Up @@ -168,7 +168,9 @@ absl::StatusOr<RawAsyncClientSharedPtr> AsyncClientManagerImpl::getOrCreateRawAs
auto factory_or_error =
factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check);
RETURN_IF_NOT_OK_REF(factory_or_error.status());
client = factory_or_error.value()->createUncachedRawAsyncClient();
auto client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
RETURN_IF_NOT_OK_REF(client_or_error.status());
client = std::move(*client_or_error);
raw_async_client_cache_->setCache(config_with_hash_key, client);
return client;
}
Expand All @@ -184,7 +186,9 @@ AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey(
auto factory_or_error =
factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check);
RETURN_IF_NOT_OK_REF(factory_or_error.status());
client = factory_or_error.value()->createUncachedRawAsyncClient();
auto client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
RETURN_IF_NOT_OK_REF(client_or_error.status());
client = std::move(*client_or_error);
raw_async_client_cache_->setCache(config_with_hash_key, client);
return client;
}
Expand Down
4 changes: 2 additions & 2 deletions source/common/grpc/async_client_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class AsyncClientFactoryImpl : public AsyncClientFactory {
const envoy::config::core::v3::GrpcService& config,
bool skip_cluster_check, TimeSource& time_source,
absl::Status& creation_status);
RawAsyncClientPtr createUncachedRawAsyncClient() override;
absl::StatusOr<RawAsyncClientPtr> createUncachedRawAsyncClient() override;

private:
Upstream::ClusterManager& cm_;
Expand All @@ -36,7 +36,7 @@ class GoogleAsyncClientFactoryImpl : public AsyncClientFactory {
const envoy::config::core::v3::GrpcService& config,
Server::Configuration::CommonFactoryContext& context,
const StatNames& stat_names, absl::Status& creation_status);
RawAsyncClientPtr createUncachedRawAsyncClient() override;
absl::StatusOr<RawAsyncClientPtr> createUncachedRawAsyncClient() override;

private:
ThreadLocal::Instance& tls_;
Expand Down
53 changes: 38 additions & 15 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstr
return blocking_ads_cluster;
}

absl::Status createClients(Grpc::AsyncClientFactoryPtr& primary_factory,
Grpc::AsyncClientFactoryPtr& failover_factory,
Grpc::RawAsyncClientPtr& primary_client,
Grpc::RawAsyncClientPtr& failover_client) {
absl::StatusOr<Grpc::RawAsyncClientPtr> success = primary_factory->createUncachedRawAsyncClient();
RETURN_IF_NOT_OK_REF(success.status());
primary_client = std::move(*success);
if (failover_factory) {
success = failover_factory->createUncachedRawAsyncClient();
RETURN_IF_NOT_OK_REF(success.status());
failover_client = std::move(*success);
}
return absl::OkStatus();
}

} // namespace

void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {
Expand Down Expand Up @@ -471,12 +486,15 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
RETURN_IF_NOT_OK_REF(factory_failover_or_error.status());
factory_failover = std::move(factory_failover_or_error.value());
}
ads_mux_ = factory->create(
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
Grpc::RawAsyncClientPtr primary_client;
Grpc::RawAsyncClientPtr failover_client;
RETURN_IF_NOT_OK(createClients(factory_primary_or_error.value(), factory_failover,
primary_client, failover_client));
ads_mux_ =
factory->create(std::move(primary_client), std::move(failover_client), dispatcher_,
random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
} else {
absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
RETURN_IF_NOT_OK(status);
Expand All @@ -502,10 +520,13 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
RETURN_IF_NOT_OK_REF(factory_failover_or_error.status());
factory_failover = std::move(factory_failover_or_error.value());
}
Grpc::RawAsyncClientPtr primary_client;
Grpc::RawAsyncClientPtr failover_client;
RETURN_IF_NOT_OK(createClients(factory_primary_or_error.value(), factory_failover,
primary_client, failover_client));
ads_mux_ = factory->create(
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(primary_client), std::move(failover_client), dispatcher_, random_,
*stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, use_eds_cache);
}
Expand Down Expand Up @@ -648,10 +669,10 @@ ClusterManagerImpl::replaceAdsMux(const envoy::config::core::v3::ApiConfigSource
RETURN_IF_NOT_OK_REF(factory_failover_or_error.status());
factory_failover = std::move(factory_failover_or_error.value());
}
Grpc::RawAsyncClientPtr primary_client =
factory_primary_or_error.value()->createUncachedRawAsyncClient();
Grpc::RawAsyncClientPtr failover_client =
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr;
Grpc::RawAsyncClientPtr primary_client;
Grpc::RawAsyncClientPtr failover_client;
RETURN_IF_NOT_OK(createClients(factory_primary_or_error.value(), factory_failover, primary_client,
failover_client));

// Primary client must not be null, as the primary xDS source must be a valid one.
// The failover_client may be null (no failover defined).
Expand All @@ -676,9 +697,11 @@ absl::Status ClusterManagerImpl::initializeSecondaryClusters(
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, load_stats_config, *stats_.rootScope(), false, 0);
RETURN_IF_NOT_OK_REF(factory_or_error.status());
absl::StatusOr<Grpc::RawAsyncClientPtr> client_or_error =
factory_or_error.value()->createUncachedRawAsyncClient();
RETURN_IF_NOT_OK_REF(client_or_error.status());
load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, *stats_.rootScope(),
factory_or_error.value()->createUncachedRawAsyncClient(), dispatcher_);
local_info_, *this, *stats_.rootScope(), std::move(*client_or_error), dispatcher_);
}
return absl::OkStatus();
}
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true);
THROW_IF_NOT_OK_REF(factory_or_error.status());
return std::make_shared<GrpcAccessLoggerImpl>(
factory_or_error.value()->createUncachedRawAsyncClient(), config, dispatcher, local_info_,
scope_);
THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
config, dispatcher, local_info_, scope_);
}

} // namespace GrpcCommon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
auto factory_or_error = async_client_manager_.factoryForGrpcService(
config.common_config().grpc_service(), scope_, true);
THROW_IF_NOT_OK_REF(factory_or_error.status());
auto client = factory_or_error.value()->createUncachedRawAsyncClient();
auto client = THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr);
return std::make_shared<GrpcAccessLoggerImpl>(std::move(client), config, dispatcher, local_info_,
scope_);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
GrpcMuxContext grpc_mux_context{
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
THROW_OR_RETURN_VALUE(factory_primary_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
/*failover_async_client_=*/nullptr,
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
GrpcMuxContext grpc_mux_context{
/*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*async_client_=*/THROW_OR_RETURN_VALUE(
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
/*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/sotwGrpcMethod(data.type_url_),
Expand Down Expand Up @@ -82,7 +84,9 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
GrpcMuxContext grpc_mux_context{
/*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*async_client_=*/THROW_OR_RETURN_VALUE(
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
/*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/network/ext_authz/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ Network::FilterFactoryCb ExtAuthzConfigFactory::createFilterFactoryFromProtoType
.factoryForGrpcService(grpc_service, context.scope(), true);
THROW_IF_NOT_OK_REF(factory_or_error.status());
auto client = std::make_unique<Filters::Common::ExtAuthz::GrpcClientImpl>(
factory_or_error.value()->createUncachedRawAsyncClient(),
THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
std::chrono::milliseconds(timeout_ms));
filter_manager.addReadFilter(Network::ReadFilterSharedPtr{
std::make_shared<Filter>(ext_authz_config, std::move(client))});
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/network/redis_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP
THROW_IF_NOT_OK_REF(auth_client_factory_or_error.status());

auth_client = std::make_unique<ExternalAuth::GrpcExternalAuthClient>(
auth_client_factory_or_error.value()->createUncachedRawAsyncClient(),
THROW_OR_RETURN_VALUE(
auth_client_factory_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
std::chrono::milliseconds(timeout_ms));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Driver::Driver(const envoy::config::trace::v3::OpenTelemetryConfig& opentelemetr
THROW_IF_NOT_OK_REF(factory_or_error.status());
Grpc::AsyncClientFactoryPtr&& factory = std::move(factory_or_error.value());
const Grpc::RawAsyncClientSharedPtr& async_client_shared_ptr =
factory->createUncachedRawAsyncClient();
THROW_OR_RETURN_VALUE(factory->createUncachedRawAsyncClient(), Grpc::RawAsyncClientPtr);
exporter = std::make_unique<OpenTelemetryGrpcTraceExporter>(async_client_shared_ptr);
} else if (opentelemetry_config.has_http_service()) {
exporter = std::make_unique<OpenTelemetryHttpTraceExporter>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ TraceSegmentReporter::TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory
Random::RandomGenerator& random_generator,
SkyWalkingTracerStatsSharedPtr stats,
uint32_t delayed_buffer_size, const std::string& token)
: tracing_stats_(stats), client_(factory->createUncachedRawAsyncClient()),
: tracing_stats_(stats), client_(THROW_OR_RETURN_VALUE(factory->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr)),
service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"skywalking.v3.TraceSegmentReportService.collect")),
random_generator_(random_generator), token_(token),
Expand Down
9 changes: 5 additions & 4 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,11 @@ void InstanceBase::onRuntimeReady() {
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, hds_config, *stats_store_.rootScope(), false, 0);
THROW_IF_NOT_OK_REF(factory_or_error.status());
hds_delegate_ =
maybeCreateHdsDelegate(serverFactoryContext(), *stats_store_.rootScope(),
factory_or_error.value()->createUncachedRawAsyncClient(),
stats_store_, *ssl_context_manager_);
hds_delegate_ = maybeCreateHdsDelegate(
serverFactoryContext(), *stats_store_.rootScope(),
THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(),
Grpc::RawAsyncClientPtr),
stats_store_, *ssl_context_manager_);
}
END_TRY
CATCH(const EnvoyException& e,
Expand Down
4 changes: 2 additions & 2 deletions test/common/grpc/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class EnvoyAsyncClientImplTest : public testing::Test {
initial_metadata_entry.set_key("downstream-local-address");
initial_metadata_entry.set_value("%DOWNSTREAM_LOCAL_ADDRESS_WITHOUT_PORT%");

grpc_client_ = std::make_unique<AsyncClientImpl>(cm_, config, test_time_.timeSystem());
grpc_client_ = *AsyncClientImpl::create(cm_, config, test_time_.timeSystem());
cm_.initializeThreadLocalClusters({"test_cluster"});
ON_CALL(cm_.thread_local_cluster_, httpAsyncClient()).WillByDefault(ReturnRef(http_client_));
}
Expand Down Expand Up @@ -90,7 +90,7 @@ TEST_F(EnvoyAsyncClientImplTest, HostIsOverrideByConfig) {
config.mutable_envoy_grpc()->set_cluster_name("test_cluster");
config.mutable_envoy_grpc()->set_authority("demo.com");

grpc_client_ = std::make_unique<AsyncClientImpl>(cm_, config, test_time_.timeSystem());
grpc_client_ = *AsyncClientImpl::create(cm_, config, test_time_.timeSystem());
EXPECT_CALL(cm_.thread_local_cluster_, httpAsyncClient()).WillRepeatedly(ReturnRef(http_client_));

NiceMock<MockAsyncStreamCallbacks<helloworld::HelloReply>> grpc_callbacks;
Expand Down
Loading

0 comments on commit a0c96b3

Please sign in to comment.