diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index cb970e49a3b2..cd8d5f80b611 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -52,6 +52,21 @@ ActiveTcpClient::~ActiveTcpClient() { } } +// Undo the readDisable done in onEvent(Connected) - now that there is an associated connection, +// drain any data. +void ActiveTcpClient::readEnableIfNew() { + // It is expected for Envoy use of ActiveTcpClient this function only be + // called once. Other users of the TcpConnPool may recycle Tcp connections, + // and this safeguards them against read-enabling too many times. + if (!associated_before_) { + associated_before_ = true; + connection_->readDisable(false); + // Also while we're at it, make sure the connection will proxy all TCP + // data before picking up a FIN. + connection_->detectEarlyCloseWhenReadDisabled(false); + } +} + void ActiveTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); } void ActiveTcpClient::clearCallbacks() { @@ -117,5 +132,74 @@ void ActiveTcpClient::setIdleTimer() { } } +void ConnPoolImpl::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) { + drainConnectionsImpl(drain_behavior); + if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { + return; + } + // Legacy behavior for the TCP connection pool marks all connecting clients + // as draining. + for (auto& connecting_client : connecting_clients_) { + if (connecting_client->remaining_streams_ > 1) { + uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit(); + connecting_client->remaining_streams_ = 1; + if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) { + decrConnectingAndConnectedStreamCapacity( + old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client); + } + } + } +} + +void ConnPoolImpl::closeConnections() { + for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) { + while (!list->empty()) { + list->front()->close(); + } + } +} +ConnectionPool::Cancellable* +ConnPoolImpl::newConnection(Tcp::ConnectionPool::Callbacks& callbacks) { + TcpAttachContext context(&callbacks); + // TLS early data over TCP is not supported yet. + return newStreamImpl(context, /*can_send_early_data=*/false); +} + +ConnectionPool::Cancellable* +ConnPoolImpl::newPendingStream(Envoy::ConnectionPool::AttachContext& context, + bool can_send_early_data) { + Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique( + *this, can_send_early_data, typedContext(context)); + return addPendingStream(std::move(pending_stream)); +} + +Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() { + return std::make_unique(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(), + 1, idle_timeout_); +} + +void ConnPoolImpl::onPoolReady(Envoy::ConnectionPool::ActiveClient& client, + Envoy::ConnectionPool::AttachContext& context) { + ActiveTcpClient* tcp_client = static_cast(&client); + tcp_client->readEnableIfNew(); + auto* callbacks = typedContext(context).callbacks_; + std::unique_ptr connection_data = + std::make_unique(*tcp_client, *tcp_client->connection_); + callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_); + + // The tcp client is taken over. Stop the idle timer. + if (!connection_data) { + tcp_client->disableIdleTimer(); + } +} + +void ConnPoolImpl::onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description, + absl::string_view failure_reason, + ConnectionPool::PoolFailureReason reason, + Envoy::ConnectionPool::AttachContext& context) { + auto* callbacks = typedContext(context).callbacks_; + callbacks->onPoolFailure(reason, failure_reason, host_description); +} + } // namespace Tcp } // namespace Envoy diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index 75744ceafd10..958d6dbb5a7f 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -97,20 +97,8 @@ class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient { void onAboveWriteBufferHighWatermark() override { callbacks_->onAboveWriteBufferHighWatermark(); } void onBelowWriteBufferLowWatermark() override { callbacks_->onBelowWriteBufferLowWatermark(); } - // Undo the readDisable done in onEvent(Connected) - now that there is an associated connection, - // drain any data. - void readEnableIfNew() { - // It is expected for Envoy use of ActiveTcpClient this function only be - // called once. Other users of the TcpConnPool may recycle Tcp connections, - // and this safeguards them against read-enabling too many times. - if (!associated_before_) { - associated_before_ = true; - connection_->readDisable(false); - // Also while we're at it, make sure the connection will proxy all TCP - // data before picking up a FIN. - connection_->detectEarlyCloseWhenReadDisabled(false); - } - } + // Undos the readDisable done in onEvent(Connected) + void readEnableIfNew(); void initializeReadFilters() override { connection_->initializeReadFilters(); } absl::optional protocol() const override { return {}; } @@ -163,79 +151,23 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase, void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); } bool isIdle() const override { return isIdleImpl(); } - void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override { - drainConnectionsImpl(drain_behavior); - if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { - return; - } - // Legacy behavior for the TCP connection pool marks all connecting clients - // as draining. - for (auto& connecting_client : connecting_clients_) { - if (connecting_client->remaining_streams_ > 1) { - uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit(); - connecting_client->remaining_streams_ = 1; - if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) { - decrConnectingAndConnectedStreamCapacity( - old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client); - } - } - } - } - - void closeConnections() override { - for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) { - while (!list->empty()) { - list->front()->close(); - } - } - } - ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override { - TcpAttachContext context(&callbacks); - // TLS early data over TCP is not supported yet. - return newStreamImpl(context, /*can_send_early_data=*/false); - } + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override; + void closeConnections() override; + ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override; bool maybePreconnect(float preconnect_ratio) override { return maybePreconnectImpl(preconnect_ratio); } - ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context, - bool can_send_early_data) override { - Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique( - *this, can_send_early_data, typedContext(context)); - return addPendingStream(std::move(pending_stream)); - } - + bool can_send_early_data) override; Upstream::HostDescriptionConstSharedPtr host() const override { return Envoy::ConnectionPool::ConnPoolImplBase::host(); } - - Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override { - return std::make_unique(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(), - 1, idle_timeout_); - } - + Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override; void onPoolReady(Envoy::ConnectionPool::ActiveClient& client, - Envoy::ConnectionPool::AttachContext& context) override { - ActiveTcpClient* tcp_client = static_cast(&client); - tcp_client->readEnableIfNew(); - auto* callbacks = typedContext(context).callbacks_; - std::unique_ptr connection_data = - std::make_unique(*tcp_client, *tcp_client->connection_); - callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_); - - // The tcp client is taken over. Stop the idle timer. - if (!connection_data) { - tcp_client->disableIdleTimer(); - } - } - + Envoy::ConnectionPool::AttachContext& context) override; void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description, absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason, - Envoy::ConnectionPool::AttachContext& context) override { - auto* callbacks = typedContext(context).callbacks_; - callbacks->onPoolFailure(reason, failure_reason, host_description); - } - + Envoy::ConnectionPool::AttachContext& context) override; bool enforceMaxRequests() const override { return false; } // These two functions exist for testing parity between old and new Tcp Connection Pools. virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {} diff --git a/test/per_file_coverage.sh b/test/per_file_coverage.sh index ae83388bef9e..7253b2a12d32 100755 --- a/test/per_file_coverage.sh +++ b/test/per_file_coverage.sh @@ -19,7 +19,6 @@ declare -a KNOWN_LOW_COVERAGE=( "source/common/quic:93.4" "source/common/secret:95.1" "source/common/signal:87.2" # Death tests don't report LCOV -"source/common/tcp:94.5" "source/common/thread:0.0" # Death tests don't report LCOV "source/common/watchdog:58.6" # Death tests don't report LCOV "source/exe:91.4"