Skip to content

Commit

Permalink
tcp: move code from .h file to .cc file
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk committed Jan 4, 2024
1 parent ced529e commit a734d3c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 78 deletions.
84 changes: 84 additions & 0 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ ActiveTcpClient::~ActiveTcpClient() {
}
}

// Undo the readDisable done in onEvent(Connected) - now that there is an associated connection,
// drain any data.
void ActiveTcpClient::readEnableIfNew() {
// It is expected for Envoy use of ActiveTcpClient this function only be
// called once. Other users of the TcpConnPool may recycle Tcp connections,
// and this safeguards them against read-enabling too many times.
if (!associated_before_) {
associated_before_ = true;
connection_->readDisable(false);
// Also while we're at it, make sure the connection will proxy all TCP
// data before picking up a FIN.
connection_->detectEarlyCloseWhenReadDisabled(false);
}
}

void ActiveTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); }

void ActiveTcpClient::clearCallbacks() {
Expand Down Expand Up @@ -117,5 +132,74 @@ void ActiveTcpClient::setIdleTimer() {
}
}

void ConnPoolImpl::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) {
drainConnectionsImpl(drain_behavior);
if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
return;
}
// Legacy behavior for the TCP connection pool marks all connecting clients
// as draining.
for (auto& connecting_client : connecting_clients_) {
if (connecting_client->remaining_streams_ > 1) {
uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit();
connecting_client->remaining_streams_ = 1;
if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) {
decrConnectingAndConnectedStreamCapacity(
old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client);
}
}
}
}

void ConnPoolImpl::closeConnections() {
for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) {
while (!list->empty()) {
list->front()->close();
}
}
}
ConnectionPool::Cancellable*
ConnPoolImpl::newConnection(Tcp::ConnectionPool::Callbacks& callbacks) {
TcpAttachContext context(&callbacks);
// TLS early data over TCP is not supported yet.
return newStreamImpl(context, /*can_send_early_data=*/false);
}

ConnectionPool::Cancellable*
ConnPoolImpl::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
bool can_send_early_data) {
Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>(
*this, can_send_early_data, typedContext<TcpAttachContext>(context));
return addPendingStream(std::move(pending_stream));
}

Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() {
return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(),
1, idle_timeout_);
}

void ConnPoolImpl::onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
Envoy::ConnectionPool::AttachContext& context) {
ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client);
tcp_client->readEnableIfNew();
auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data =
std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_);
callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_);

// The tcp client is taken over. Stop the idle timer.
if (!connection_data) {
tcp_client->disableIdleTimer();
}
}

void ConnPoolImpl::onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason,
ConnectionPool::PoolFailureReason reason,
Envoy::ConnectionPool::AttachContext& context) {
auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
callbacks->onPoolFailure(reason, failure_reason, host_description);
}

} // namespace Tcp
} // namespace Envoy
86 changes: 9 additions & 77 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,8 @@ class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
void onAboveWriteBufferHighWatermark() override { callbacks_->onAboveWriteBufferHighWatermark(); }
void onBelowWriteBufferLowWatermark() override { callbacks_->onBelowWriteBufferLowWatermark(); }

// Undo the readDisable done in onEvent(Connected) - now that there is an associated connection,
// drain any data.
void readEnableIfNew() {
// It is expected for Envoy use of ActiveTcpClient this function only be
// called once. Other users of the TcpConnPool may recycle Tcp connections,
// and this safeguards them against read-enabling too many times.
if (!associated_before_) {
associated_before_ = true;
connection_->readDisable(false);
// Also while we're at it, make sure the connection will proxy all TCP
// data before picking up a FIN.
connection_->detectEarlyCloseWhenReadDisabled(false);
}
}
// Undos the readDisable done in onEvent(Connected)
void readEnableIfNew();

void initializeReadFilters() override { connection_->initializeReadFilters(); }
absl::optional<Http::Protocol> protocol() const override { return {}; }
Expand Down Expand Up @@ -163,79 +151,23 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,

void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
bool isIdle() const override { return isIdleImpl(); }
void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override {
drainConnectionsImpl(drain_behavior);
if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
return;
}
// Legacy behavior for the TCP connection pool marks all connecting clients
// as draining.
for (auto& connecting_client : connecting_clients_) {
if (connecting_client->remaining_streams_ > 1) {
uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit();
connecting_client->remaining_streams_ = 1;
if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) {
decrConnectingAndConnectedStreamCapacity(
old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client);
}
}
}
}

void closeConnections() override {
for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) {
while (!list->empty()) {
list->front()->close();
}
}
}
ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override {
TcpAttachContext context(&callbacks);
// TLS early data over TCP is not supported yet.
return newStreamImpl(context, /*can_send_early_data=*/false);
}
void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override;
void closeConnections() override;
ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override;
bool maybePreconnect(float preconnect_ratio) override {
return maybePreconnectImpl(preconnect_ratio);
}

ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context,
bool can_send_early_data) override {
Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>(
*this, can_send_early_data, typedContext<TcpAttachContext>(context));
return addPendingStream(std::move(pending_stream));
}

bool can_send_early_data) override;
Upstream::HostDescriptionConstSharedPtr host() const override {
return Envoy::ConnectionPool::ConnPoolImplBase::host();
}

Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override {
return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(),
1, idle_timeout_);
}

Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override;
void onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
Envoy::ConnectionPool::AttachContext& context) override {
ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client);
tcp_client->readEnableIfNew();
auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data =
std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_);
callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_);

// The tcp client is taken over. Stop the idle timer.
if (!connection_data) {
tcp_client->disableIdleTimer();
}
}

Envoy::ConnectionPool::AttachContext& context) override;
void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason,
Envoy::ConnectionPool::AttachContext& context) override {
auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
callbacks->onPoolFailure(reason, failure_reason, host_description);
}

Envoy::ConnectionPool::AttachContext& context) override;
bool enforceMaxRequests() const override { return false; }
// These two functions exist for testing parity between old and new Tcp Connection Pools.
virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {}
Expand Down
1 change: 0 additions & 1 deletion test/per_file_coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ declare -a KNOWN_LOW_COVERAGE=(
"source/common/quic:93.4"
"source/common/secret:95.1"
"source/common/signal:87.2" # Death tests don't report LCOV
"source/common/tcp:94.5"
"source/common/thread:0.0" # Death tests don't report LCOV
"source/common/watchdog:58.6" # Death tests don't report LCOV
"source/exe:91.4"
Expand Down

0 comments on commit a734d3c

Please sign in to comment.