From 55255670acc5e8311b89899dbd610d745cae5f8d Mon Sep 17 00:00:00 2001 From: hexiaofeng Date: Sat, 28 Dec 2024 16:38:46 +0800 Subject: [PATCH] Improve performance by cache grpc endpoint --- .../router/gprc/instance/GrpcEndpoint.java | 74 +++++++++++--- .../gprc/loadbalance/LiveLoadBalancer.java | 78 ++++++++------- .../router/gprc/loadbalance/LiveRef.java | 31 ++++-- .../router/gprc/loadbalance/LiveRequest.java | 13 ++- .../gprc/loadbalance/LiveSubchannel.java | 99 ------------------- .../loadbalance/LiveSubchannelPicker.java | 18 ++-- 6 files changed, 138 insertions(+), 175 deletions(-) delete mode 100644 joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannel.java diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/instance/GrpcEndpoint.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/instance/GrpcEndpoint.java index 1bbd4a624..be3a577aa 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/instance/GrpcEndpoint.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/instance/GrpcEndpoint.java @@ -2,13 +2,20 @@ import com.jd.live.agent.governance.instance.AbstractEndpoint; import com.jd.live.agent.governance.instance.EndpointState; -import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveSubchannel; +import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRef; import io.grpc.Attributes.Key; +import io.grpc.ConnectivityState; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Subchannel; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static io.grpc.ConnectivityState.IDLE; + /** * A gRPC endpoint that provides a way to expose services over gRPC. */ @@ -16,29 +23,22 @@ public class GrpcEndpoint extends AbstractEndpoint { public static final Map> KEYS = new ConcurrentHashMap<>(); - private final LiveSubchannel subchannel; + private final Subchannel subchannel; - private final InetSocketAddress address; + private InetSocketAddress socketAddress; - public GrpcEndpoint(LiveSubchannel subchannel) { + public GrpcEndpoint(Subchannel subchannel) { this.subchannel = subchannel; - this.address = subchannel.getAddress(); } @Override public String getHost() { - if (address != null) { - return address.getHostString(); - } - return null; + return socketAddress == null ? null : socketAddress.getHostString(); } @Override public int getPort() { - if (address != null) { - return address.getPort(); - } - return 0; + return socketAddress == null ? 0 : socketAddress.getPort(); } @Override @@ -51,8 +51,54 @@ public EndpointState getState() { return EndpointState.HEALTHY; } - public LiveSubchannel getSubchannel() { + public Subchannel getSubchannel() { return subchannel; } + public void requestConnection() { + subchannel.requestConnection(); + } + + public void shutdown() { + subchannel.shutdown(); + } + + public void start(LoadBalancer.SubchannelStateListener listener) { + subchannel.start(listener); + socketAddress = getInetSocketAddress(subchannel); + } + + /** + * Gets the current ConnectivityState. + * + * @return the current ConnectivityState, or IDLE if no state is set + */ + public ConnectivityState getConnectivityState() { + LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); + return ref == null ? IDLE : ref.getState(); + } + + /** + * Sets the ConnectivityState to the specified newState. + * + * @param newState the new ConnectivityState to set + */ + public void setConnectivityState(ConnectivityState newState) { + LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); + if (ref != null) { + ref.setState(newState); + } + } + + private static InetSocketAddress getInetSocketAddress(Subchannel subchannel) { + + List addresses = subchannel.getAllAddresses().get(0).getAddresses(); + for (SocketAddress addr : addresses) { + if (addr instanceof InetSocketAddress) { + return (InetSocketAddress) addr; + } + } + return null; + } + } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java index 91a3bea69..3e4838892 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveLoadBalancer.java @@ -15,6 +15,7 @@ */ package com.jd.live.agent.plugin.router.gprc.loadbalance; +import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; import io.grpc.*; import java.lang.reflect.Field; @@ -34,7 +35,7 @@ public class LiveLoadBalancer extends LoadBalancer { private final String serviceName; - private volatile Map subchannels = new ConcurrentHashMap<>(); + private volatile Map endpoints = new ConcurrentHashMap<>(); public LiveLoadBalancer(Helper helper) { this.helper = helper; @@ -45,36 +46,36 @@ public LiveLoadBalancer(Helper helper) { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { Set latestAddresses = deDup(resolvedAddresses); - List removed = new ArrayList<>(); - List added = new ArrayList<>(); + List removed = new ArrayList<>(); + List added = new ArrayList<>(); - Map oldSubchannels = subchannels; - Map newSubchannels = new ConcurrentHashMap<>(); + Map olds = endpoints; + Map news = new ConcurrentHashMap<>(); latestAddresses.forEach(addressGroup -> { - LiveSubchannel subchannel = oldSubchannels.get(addressGroup); - if (subchannel == null) { + GrpcEndpoint endpoint = olds.get(addressGroup); + if (endpoint == null) { // create new connection - subchannel = createSubchannel(addressGroup); - added.add(subchannel); + endpoint = createEndpoint(addressGroup); + added.add(endpoint); } - newSubchannels.put(addressGroup, subchannel); + news.put(addressGroup, endpoint); }); - subchannels = newSubchannels; - oldSubchannels.forEach((addressGroup, subchannel) -> { + endpoints = news; + olds.forEach((addressGroup, endpoint) -> { if (!latestAddresses.contains(addressGroup)) { - removed.remove(subchannel); + removed.remove(endpoint); } }); // close not exists if (!removed.isEmpty()) { - removed.forEach(subchannel -> { - subchannel.setConnectivityState(SHUTDOWN); - subchannel.shutdown(); + removed.forEach(endpoint -> { + endpoint.setConnectivityState(SHUTDOWN); + endpoint.shutdown(); }); } // create new connection if (!added.isEmpty()) { - added.forEach(LiveSubchannel::requestConnection); + added.forEach(GrpcEndpoint::requestConnection); } } @@ -85,7 +86,7 @@ public void handleNameResolutionError(Status error) { @Override public void shutdown() { - subchannels.values().forEach(LiveSubchannel::shutdown); + endpoints.values().forEach(GrpcEndpoint::shutdown); } /** @@ -129,27 +130,30 @@ private Set deDup(ResolvedAddresses resolvedAddresses) { } /** - * Creates a new Subchannel for the given EquivalentAddressGroup. + * Creates a new GrpcEndpoint for the given EquivalentAddressGroup. * - * @param addressGroup the EquivalentAddressGroup to create the Subchannel for - * @return the newly created Subchannel + * @param addressGroup the EquivalentAddressGroup to create the GrpcEndpoint for + * @return the newly created GrpcEndpoint */ - private LiveSubchannel createSubchannel(EquivalentAddressGroup addressGroup) { - Attributes attributes = addressGroup.getAttributes().toBuilder().set(LiveRef.KEY_STATE, new LiveRef<>(IDLE)).build(); + private GrpcEndpoint createEndpoint(EquivalentAddressGroup addressGroup) { + LiveRef ref = new LiveRef(); + Attributes attributes = addressGroup.getAttributes().toBuilder().set(LiveRef.KEY_STATE, ref).build(); CreateSubchannelArgs args = CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).setAttributes(attributes).build(); - LiveSubchannel subchannel = new LiveSubchannel(helper.createSubchannel(args)); - subchannel.start(new LiveSubchannelStateListener(subchannel)); - return subchannel; + GrpcEndpoint endpoint = new GrpcEndpoint(helper.createSubchannel(args)); + ref.setState(IDLE); + ref.setEndpoint(endpoint); + endpoint.start(new LiveStateListener(endpoint)); + return endpoint; } /** - * Picks the ready Subchannels and updates the balancing state accordingly. + * Picks the ready GrpcEndpoints and updates the balancing state accordingly. */ private void pickReady() { - List readies = new ArrayList<>(); - subchannels.values().forEach(subchannel -> { - if (subchannel.getConnectivityState() == ConnectivityState.READY) { - readies.add(subchannel); + List readies = new ArrayList<>(); + endpoints.values().forEach(endpoint -> { + if (endpoint.getConnectivityState() == ConnectivityState.READY) { + readies.add(endpoint); } }); if (readies.isEmpty()) { @@ -163,19 +167,19 @@ private void pickReady() { } } - private class LiveSubchannelStateListener implements SubchannelStateListener { + private class LiveStateListener implements SubchannelStateListener { - private final LiveSubchannel subchannel; + private final GrpcEndpoint endpoint; - LiveSubchannelStateListener(LiveSubchannel subchannel) { - this.subchannel = subchannel; + LiveStateListener(GrpcEndpoint endpoint) { + this.endpoint = endpoint; } @Override public void onSubchannelState(ConnectivityStateInfo stateInfo) { - ConnectivityState currentState = subchannel.getConnectivityState(); + ConnectivityState currentState = endpoint.getConnectivityState(); ConnectivityState newState = stateInfo.getState(); - subchannel.setConnectivityState(newState); + endpoint.setConnectivityState(newState); if (currentState == READY && newState != READY || currentState != READY && newState == READY) { // call helper.updateBalancingState in this thread to avoid exception diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java index de2ee09b3..dbb37bc6c 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRef.java @@ -15,24 +15,37 @@ */ package com.jd.live.agent.plugin.router.gprc.loadbalance; +import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint; import io.grpc.Attributes.Key; import io.grpc.ConnectivityState; -final class LiveRef { +/** + * Represents a live reference to a gRPC endpoint with its connectivity state. + * + * This class contains a gRPC endpoint and its current connectivity state, providing methods to get and set these values. + */ +public final class LiveRef { + + public static final Key KEY_STATE = Key.create("x-state"); - public static final Key> KEY_STATE = Key.create("x-state"); + private GrpcEndpoint endpoint; - T value; + private ConnectivityState state; - LiveRef(T value) { - this.value = value; + public GrpcEndpoint getEndpoint() { + return endpoint; } - public void setValue(T value) { - this.value = value; + public void setEndpoint(GrpcEndpoint endpoint) { + this.endpoint = endpoint; } - public T getValue() { - return value; + public ConnectivityState getState() { + return state; } + + public void setState(ConnectivityState state) { + this.state = state; + } + } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRequest.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRequest.java index 9c76095a1..e7683112a 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRequest.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveRequest.java @@ -43,8 +43,6 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; -import static com.jd.live.agent.core.util.CollectionUtils.convert; - /** * Live request. */ @@ -251,13 +249,13 @@ public void setHeader(String key, String value) { } /** - * Routes the request to an appropriate subchannel from the provided list of subchannels. + * Routes the request to an appropriate endpoint from the provided list of endpoints. * - * @param subchannels The list of available subchannels. + * @param endpoints The list of available endpoint. */ - public void route(List subchannels) { + public void route(List endpoints) { try { - GrpcEndpoint endpoint = context.route(invocation, convert(subchannels, GrpcEndpoint::new)); + GrpcEndpoint endpoint = context.route(invocation, endpoints); routeResult = new LiveRouteResult(endpoint); } catch (Throwable e) { routeResult = new LiveRouteResult(e); @@ -274,7 +272,8 @@ public void route(PickResult pickResult) { if (status.isOk()) { Subchannel subchannel = pickResult.getSubchannel(); if (subchannel != null) { - routeResult = new LiveRouteResult(new GrpcEndpoint(new LiveSubchannel(subchannel))); + LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); + routeResult = new LiveRouteResult(ref != null ? ref.getEndpoint() : new GrpcEndpoint(subchannel)); } else { routeResult = new LiveRouteResult(new RejectNoProviderException("There is no provider for invocation " + request.getService())); } diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannel.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannel.java deleted file mode 100644 index 1db0e60cb..000000000 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannel.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright © ${year} ${owner} (${email}) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.jd.live.agent.plugin.router.gprc.loadbalance; - -import io.grpc.Attributes; -import io.grpc.ConnectivityState; -import io.grpc.LoadBalancer.Subchannel; -import io.grpc.LoadBalancer.SubchannelStateListener; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.List; - -import static io.grpc.ConnectivityState.IDLE; - -/** - * A class that wrap subchannel. - */ -public class LiveSubchannel { - - private final Subchannel subchannel; - - private InetSocketAddress address; - - public LiveSubchannel(Subchannel subchannel) { - this.subchannel = subchannel; - } - - public Subchannel getSubchannel() { - return subchannel; - } - - public InetSocketAddress getAddress() { - return address; - } - - public Attributes getAttributes() { - return subchannel.getAttributes(); - } - - public void requestConnection() { - subchannel.requestConnection(); - } - - public void shutdown() { - subchannel.shutdown(); - } - - public void start(SubchannelStateListener listener) { - subchannel.start(listener); - address = getInetSocketAddress(subchannel); - } - - /** - * Gets the current ConnectivityState. - * - * @return the current ConnectivityState, or IDLE if no state is set - */ - public ConnectivityState getConnectivityState() { - LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); - return ref == null ? IDLE : ref.getValue(); - } - - /** - * Sets the ConnectivityState to the specified newState. - * - * @param newState the new ConnectivityState to set - */ - public void setConnectivityState(ConnectivityState newState) { - LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE); - if (ref != null) { - ref.setValue(newState); - } - } - - private static InetSocketAddress getInetSocketAddress(Subchannel subchannel) { - - List addresses = subchannel.getAllAddresses().get(0).getAddresses(); - for (SocketAddress addr : addresses) { - if (addr instanceof InetSocketAddress) { - return (InetSocketAddress) addr; - } - } - return null; - } -} diff --git a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java index e1336240c..20f4d193b 100644 --- a/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java +++ b/joylive-plugin/joylive-router/joylive-router-grpc/src/main/java/com/jd/live/agent/plugin/router/gprc/loadbalance/LiveSubchannelPicker.java @@ -31,7 +31,7 @@ public class LiveSubchannelPicker extends SubchannelPicker { private final PickResult pickResult; - private final List subchannels; + private final List endpoints; private final AtomicLong counter = new AtomicLong(); @@ -39,13 +39,13 @@ public LiveSubchannelPicker(PickResult pickResult) { this(pickResult, null); } - public LiveSubchannelPicker(List subchannels) { - this(null, subchannels); + public LiveSubchannelPicker(List endpoints) { + this(null, endpoints); } - public LiveSubchannelPicker(PickResult pickResult, List subchannels) { + public LiveSubchannelPicker(PickResult pickResult, List endpoints) { this.pickResult = pickResult; - this.subchannels = subchannels; + this.endpoints = endpoints; } @Override @@ -57,13 +57,13 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } else if (pickResult != null) { return pickResult; } else if (request != null) { - request.route(subchannels); + request.route(endpoints); LiveRouteResult result = request.getRouteResult(); if (result.isSuccess()) { GrpcEndpoint endpoint = result.getEndpoint(); return endpoint == null ? PickResult.withNoResult() - : PickResult.withSubchannel(endpoint.getSubchannel().getSubchannel()); + : PickResult.withSubchannel(endpoint.getSubchannel()); } else { return PickResult.withError(GrpcStatus.createException(result.getThrowable())); } @@ -73,8 +73,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { counter.set(0); v = counter.getAndIncrement(); } - int index = (int) (v % subchannels.size()); - return PickResult.withSubchannel(subchannels.get(index).getSubchannel()); + int index = (int) (v % endpoints.size()); + return PickResult.withSubchannel(endpoints.get(index).getSubchannel()); } }