Skip to content

Commit

Permalink
Improve performance by cache grpc endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed Dec 28, 2024
1 parent c3ff69d commit 5525567
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,43 @@

import com.jd.live.agent.governance.instance.AbstractEndpoint;
import com.jd.live.agent.governance.instance.EndpointState;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveSubchannel;
import com.jd.live.agent.plugin.router.gprc.loadbalance.LiveRef;
import io.grpc.Attributes.Key;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Subchannel;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static io.grpc.ConnectivityState.IDLE;

/**
* A gRPC endpoint that provides a way to expose services over gRPC.
*/
public class GrpcEndpoint extends AbstractEndpoint {

public static final Map<String, Key<String>> KEYS = new ConcurrentHashMap<>();

private final LiveSubchannel subchannel;
private final Subchannel subchannel;

private final InetSocketAddress address;
private InetSocketAddress socketAddress;

public GrpcEndpoint(LiveSubchannel subchannel) {
public GrpcEndpoint(Subchannel subchannel) {
this.subchannel = subchannel;
this.address = subchannel.getAddress();
}

@Override
public String getHost() {
if (address != null) {
return address.getHostString();
}
return null;
return socketAddress == null ? null : socketAddress.getHostString();
}

@Override
public int getPort() {
if (address != null) {
return address.getPort();
}
return 0;
return socketAddress == null ? 0 : socketAddress.getPort();
}

@Override
Expand All @@ -51,8 +51,54 @@ public EndpointState getState() {
return EndpointState.HEALTHY;
}

public LiveSubchannel getSubchannel() {
public Subchannel getSubchannel() {
return subchannel;
}

public void requestConnection() {
subchannel.requestConnection();
}

public void shutdown() {
subchannel.shutdown();
}

public void start(LoadBalancer.SubchannelStateListener listener) {
subchannel.start(listener);
socketAddress = getInetSocketAddress(subchannel);
}

/**
* Gets the current ConnectivityState.
*
* @return the current ConnectivityState, or IDLE if no state is set
*/
public ConnectivityState getConnectivityState() {
LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE);
return ref == null ? IDLE : ref.getState();
}

/**
* Sets the ConnectivityState to the specified newState.
*
* @param newState the new ConnectivityState to set
*/
public void setConnectivityState(ConnectivityState newState) {
LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE);
if (ref != null) {
ref.setState(newState);
}
}

private static InetSocketAddress getInetSocketAddress(Subchannel subchannel) {

List<SocketAddress> addresses = subchannel.getAllAddresses().get(0).getAddresses();
for (SocketAddress addr : addresses) {
if (addr instanceof InetSocketAddress) {
return (InetSocketAddress) addr;
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.jd.live.agent.plugin.router.gprc.loadbalance;

import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint;
import io.grpc.*;

import java.lang.reflect.Field;
Expand All @@ -34,7 +35,7 @@ public class LiveLoadBalancer extends LoadBalancer {

private final String serviceName;

private volatile Map<EquivalentAddressGroup, LiveSubchannel> subchannels = new ConcurrentHashMap<>();
private volatile Map<EquivalentAddressGroup, GrpcEndpoint> endpoints = new ConcurrentHashMap<>();

public LiveLoadBalancer(Helper helper) {
this.helper = helper;
Expand All @@ -45,36 +46,36 @@ public LiveLoadBalancer(Helper helper) {
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Set<EquivalentAddressGroup> latestAddresses = deDup(resolvedAddresses);

List<LiveSubchannel> removed = new ArrayList<>();
List<LiveSubchannel> added = new ArrayList<>();
List<GrpcEndpoint> removed = new ArrayList<>();
List<GrpcEndpoint> added = new ArrayList<>();

Map<EquivalentAddressGroup, LiveSubchannel> oldSubchannels = subchannels;
Map<EquivalentAddressGroup, LiveSubchannel> newSubchannels = new ConcurrentHashMap<>();
Map<EquivalentAddressGroup, GrpcEndpoint> olds = endpoints;
Map<EquivalentAddressGroup, GrpcEndpoint> news = new ConcurrentHashMap<>();
latestAddresses.forEach(addressGroup -> {
LiveSubchannel subchannel = oldSubchannels.get(addressGroup);
if (subchannel == null) {
GrpcEndpoint endpoint = olds.get(addressGroup);
if (endpoint == null) {
// create new connection
subchannel = createSubchannel(addressGroup);
added.add(subchannel);
endpoint = createEndpoint(addressGroup);
added.add(endpoint);
}
newSubchannels.put(addressGroup, subchannel);
news.put(addressGroup, endpoint);
});
subchannels = newSubchannels;
oldSubchannels.forEach((addressGroup, subchannel) -> {
endpoints = news;
olds.forEach((addressGroup, endpoint) -> {
if (!latestAddresses.contains(addressGroup)) {
removed.remove(subchannel);
removed.remove(endpoint);
}
});
// close not exists
if (!removed.isEmpty()) {
removed.forEach(subchannel -> {
subchannel.setConnectivityState(SHUTDOWN);
subchannel.shutdown();
removed.forEach(endpoint -> {
endpoint.setConnectivityState(SHUTDOWN);
endpoint.shutdown();
});
}
// create new connection
if (!added.isEmpty()) {
added.forEach(LiveSubchannel::requestConnection);
added.forEach(GrpcEndpoint::requestConnection);
}
}

Expand All @@ -85,7 +86,7 @@ public void handleNameResolutionError(Status error) {

@Override
public void shutdown() {
subchannels.values().forEach(LiveSubchannel::shutdown);
endpoints.values().forEach(GrpcEndpoint::shutdown);
}

/**
Expand Down Expand Up @@ -129,27 +130,30 @@ private Set<EquivalentAddressGroup> deDup(ResolvedAddresses resolvedAddresses) {
}

/**
* Creates a new Subchannel for the given EquivalentAddressGroup.
* Creates a new GrpcEndpoint for the given EquivalentAddressGroup.
*
* @param addressGroup the EquivalentAddressGroup to create the Subchannel for
* @return the newly created Subchannel
* @param addressGroup the EquivalentAddressGroup to create the GrpcEndpoint for
* @return the newly created GrpcEndpoint
*/
private LiveSubchannel createSubchannel(EquivalentAddressGroup addressGroup) {
Attributes attributes = addressGroup.getAttributes().toBuilder().set(LiveRef.KEY_STATE, new LiveRef<>(IDLE)).build();
private GrpcEndpoint createEndpoint(EquivalentAddressGroup addressGroup) {
LiveRef ref = new LiveRef();
Attributes attributes = addressGroup.getAttributes().toBuilder().set(LiveRef.KEY_STATE, ref).build();
CreateSubchannelArgs args = CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).setAttributes(attributes).build();
LiveSubchannel subchannel = new LiveSubchannel(helper.createSubchannel(args));
subchannel.start(new LiveSubchannelStateListener(subchannel));
return subchannel;
GrpcEndpoint endpoint = new GrpcEndpoint(helper.createSubchannel(args));
ref.setState(IDLE);
ref.setEndpoint(endpoint);
endpoint.start(new LiveStateListener(endpoint));
return endpoint;
}

/**
* Picks the ready Subchannels and updates the balancing state accordingly.
* Picks the ready GrpcEndpoints and updates the balancing state accordingly.
*/
private void pickReady() {
List<LiveSubchannel> readies = new ArrayList<>();
subchannels.values().forEach(subchannel -> {
if (subchannel.getConnectivityState() == ConnectivityState.READY) {
readies.add(subchannel);
List<GrpcEndpoint> readies = new ArrayList<>();
endpoints.values().forEach(endpoint -> {
if (endpoint.getConnectivityState() == ConnectivityState.READY) {
readies.add(endpoint);
}
});
if (readies.isEmpty()) {
Expand All @@ -163,19 +167,19 @@ private void pickReady() {
}
}

private class LiveSubchannelStateListener implements SubchannelStateListener {
private class LiveStateListener implements SubchannelStateListener {

private final LiveSubchannel subchannel;
private final GrpcEndpoint endpoint;

LiveSubchannelStateListener(LiveSubchannel subchannel) {
this.subchannel = subchannel;
LiveStateListener(GrpcEndpoint endpoint) {
this.endpoint = endpoint;
}

@Override
public void onSubchannelState(ConnectivityStateInfo stateInfo) {
ConnectivityState currentState = subchannel.getConnectivityState();
ConnectivityState currentState = endpoint.getConnectivityState();
ConnectivityState newState = stateInfo.getState();
subchannel.setConnectivityState(newState);
endpoint.setConnectivityState(newState);
if (currentState == READY && newState != READY
|| currentState != READY && newState == READY) {
// call helper.updateBalancingState in this thread to avoid exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,37 @@
*/
package com.jd.live.agent.plugin.router.gprc.loadbalance;

import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint;
import io.grpc.Attributes.Key;
import io.grpc.ConnectivityState;

final class LiveRef<T> {
/**
* Represents a live reference to a gRPC endpoint with its connectivity state.
*
* This class contains a gRPC endpoint and its current connectivity state, providing methods to get and set these values.
*/
public final class LiveRef {

public static final Key<LiveRef> KEY_STATE = Key.create("x-state");

public static final Key<LiveRef<ConnectivityState>> KEY_STATE = Key.create("x-state");
private GrpcEndpoint endpoint;

T value;
private ConnectivityState state;

LiveRef(T value) {
this.value = value;
public GrpcEndpoint getEndpoint() {
return endpoint;
}

public void setValue(T value) {
this.value = value;
public void setEndpoint(GrpcEndpoint endpoint) {
this.endpoint = endpoint;
}

public T getValue() {
return value;
public ConnectivityState getState() {
return state;
}

public void setState(ConnectivityState state) {
this.state = state;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static com.jd.live.agent.core.util.CollectionUtils.convert;

/**
* Live request.
*/
Expand Down Expand Up @@ -251,13 +249,13 @@ public void setHeader(String key, String value) {
}

/**
* Routes the request to an appropriate subchannel from the provided list of subchannels.
* Routes the request to an appropriate endpoint from the provided list of endpoints.
*
* @param subchannels The list of available subchannels.
* @param endpoints The list of available endpoint.
*/
public void route(List<LiveSubchannel> subchannels) {
public void route(List<GrpcEndpoint> endpoints) {
try {
GrpcEndpoint endpoint = context.route(invocation, convert(subchannels, GrpcEndpoint::new));
GrpcEndpoint endpoint = context.route(invocation, endpoints);
routeResult = new LiveRouteResult(endpoint);
} catch (Throwable e) {
routeResult = new LiveRouteResult(e);
Expand All @@ -274,7 +272,8 @@ public void route(PickResult pickResult) {
if (status.isOk()) {
Subchannel subchannel = pickResult.getSubchannel();
if (subchannel != null) {
routeResult = new LiveRouteResult(new GrpcEndpoint(new LiveSubchannel(subchannel)));
LiveRef ref = subchannel.getAttributes().get(LiveRef.KEY_STATE);
routeResult = new LiveRouteResult(ref != null ? ref.getEndpoint() : new GrpcEndpoint(subchannel));
} else {
routeResult = new LiveRouteResult(new RejectNoProviderException("There is no provider for invocation " + request.getService()));
}
Expand Down
Loading

0 comments on commit 5525567

Please sign in to comment.