Skip to content

Commit

Permalink
Merge pull request #183
Browse files Browse the repository at this point in the history
162-support-spring-cloud-grpc-outbound-grovernace
  • Loading branch information
chenzhiguo authored Dec 29, 2024
2 parents a4cc676 + 5525567 commit 32ef3cf
Show file tree
Hide file tree
Showing 37 changed files with 1,293 additions and 581 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.exception;

public interface WrappedException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
*/
package com.jd.live.agent.core.util;

import com.jd.live.agent.core.exception.WrappedException;

import java.lang.reflect.InvocationTargetException;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -81,4 +85,14 @@ public static Set<String> getExceptions(Throwable e, Predicate<Throwable> predic
return names;
}

/**
* Checks if the given throwable is not a wrapped exception.
*
* @param e The throwable to check.
* @return {@code true} if the throwable is not a wrapped exception, {@code false} otherwise.
*/
public static boolean isNoneWrapped(Throwable e) {
return !(e instanceof WrappedException || e instanceof InvocationTargetException || e instanceof ExecutionException);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static com.jd.live.agent.core.util.ExceptionUtils.isNoneWrapped;
import static com.jd.live.agent.governance.exception.ErrorPolicy.containsException;

@AllArgsConstructor
Expand Down Expand Up @@ -109,7 +110,7 @@ public static ErrorCause cause(Throwable throwable,
Set<String> exceptions = new LinkedHashSet<>(8);
Throwable cause = null;
Throwable candiate = null;
Throwable t = throwable;
Throwable t = isNoneWrapped(throwable) ? throwable : throwable.getCause();
ErrorName errorName;
Predicate<Throwable> test = predicate == null ? null : predicate.getPredicate();
while (t != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public GatewayRole getGateway() {
* Resets the state of the instance.
* This method is typically used to restore the instance to its initial state.
*/
public void reset() {
public void resetOnRetry() {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected LiveParser createLiveParser() {
}

@Override
public void reset() {
public void resetOnRetry() {
listeners = null;
routeTarget = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.governance.invoke.OutboundInvocation;
import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy;
import com.jd.live.agent.governance.request.RoutedRequest;
import com.jd.live.agent.governance.request.ServiceRequest.OutboundRequest;
import com.jd.live.agent.governance.response.ServiceResponse.OutboundResponse;

Expand Down Expand Up @@ -72,17 +73,14 @@ E extends Endpoint> CompletionStage<O> invoke(LiveCluster<R, O, E> cluster, Outb
InvocationContext context = invocation.getContext();
R request = invocation.getRequest();
List<? extends Endpoint> instances = invocation.getInstances();
if (counter > 0) {
invocation.reset();
}
CompletionStage<List<E>> discoveryStage = instances == null || instances.isEmpty() || counter > 0
? cluster.route(request)
: CompletableFuture.completedFuture((List<E>) instances);
discoveryStage.whenComplete((v, t) -> {
if (t == null) {
E endpoint = null;
try {
endpoint = context.route(invocation, v);
endpoint = request instanceof RoutedRequest ? ((RoutedRequest) request).getEndpoint() : context.route(invocation, v);
E instance = endpoint;
onStartRequest(cluster, request, endpoint);
CompletionStage<O> stage = context.outbound(invocation, endpoint, () -> cluster.invoke(request, instance));
Expand Down Expand Up @@ -193,8 +191,8 @@ E extends Endpoint> void onException(LiveCluster<R, O, E> cluster,
try {
invocation.onFailure(endpoint, cause);
if (error == null) {
// Request was handled successfully by degrade
cluster.onSuccess(response, request, endpoint);
// Request was recover successfully by degrade
cluster.onRecover(response, request, endpoint);
} else if (cause instanceof LiveException) {
// Request did not go off box
cluster.onDiscard(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import com.jd.live.agent.bootstrap.exception.RejectException.RejectUnreadyException;
import com.jd.live.agent.bootstrap.exception.Unretryable;
import com.jd.live.agent.bootstrap.logger.Logger;
import com.jd.live.agent.bootstrap.logger.LoggerFactory;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
Expand All @@ -28,7 +26,6 @@
import com.jd.live.agent.governance.exception.ServiceError;
import com.jd.live.agent.governance.instance.Endpoint;
import com.jd.live.agent.governance.invoke.OutboundInvocation;
import com.jd.live.agent.governance.invoke.filter.route.CircuitBreakerFilter;
import com.jd.live.agent.governance.policy.service.ServicePolicy;
import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy;
import com.jd.live.agent.governance.policy.service.cluster.RetryPolicy;
Expand All @@ -55,8 +52,6 @@
@Extension(value = ClusterInvoker.TYPE_FAILOVER, order = ClusterInvoker.ORDER_FAILOVER)
public class FailoverClusterInvoker extends AbstractClusterInvoker {

private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerFilter.class);

@Inject
private Map<String, ErrorParser> codeParsers;

Expand All @@ -73,9 +68,9 @@ E extends Endpoint> CompletionStage<O> execute(LiveCluster<R, O, E> cluster,
R request = invocation.getRequest();
request.addErrorPolicy(retryPolicy);
RetryContext<R, O, E> retryContext = new RetryContext<>(codeParsers, retryPolicy, cluster);
Supplier<CompletionStage<O>> supplier = () -> invoke(cluster, invocation, retryContext.getCount());
Supplier<CompletionStage<O>> supplier = () -> invoke(cluster, invocation, retryContext.getAndIncrement());
cluster.onStart(request);
return retryContext.execute(request, supplier).exceptionally(e ->
return retryContext.execute(invocation, supplier).exceptionally(e ->
cluster.createResponse(
cluster.createException(e, invocation),
request,
Expand Down Expand Up @@ -144,45 +139,44 @@ private static class RetryContext<R extends OutboundRequest,
* retried until the policy's conditions are no longer met.
* </p>
*
* @param request The request that was being processed.
* @param supplier A supplier providing the operation to be executed as a {@link CompletionStage}.
* @param invocation The {@link OutboundInvocation} representing the specific request and its routing information.
* @param supplier A supplier providing the operation to be executed as a {@link CompletionStage}.
* @return A {@link CompletionStage} representing the eventual completion of the operation,
* either successfully or with an error.
*/
public CompletionStage<O> execute(R request, Supplier<CompletionStage<O>> supplier) {
public CompletionStage<O> execute(OutboundInvocation<R> invocation, Supplier<CompletionStage<O>> supplier) {
CompletableFuture<O> result = new CompletableFuture<>();
doExecute(request, supplier, result);
doExecute(invocation, supplier, result);
return result;

}

/**
* Recursively executes the operation, applying retry logic and completing the future
* based on the outcome of each attempt.
* <p>
* This method is called internally to handle the actual execution and retry logic.
* It uses the {@link RetryPolicy} to determine whether an operation should be retried
* in case of failure or certain response conditions.
* </p>
*
* @param request The request that was being processed.
* @param supplier A supplier providing the operation to be executed.
* @param future The {@link CompletableFuture} to be completed with the operation's result.
* @param invocation The {@link OutboundInvocation} representing the specific request and its routing information.
* @param supplier A supplier providing the operation to be executed.
* @param future The {@link CompletableFuture} to be completed with the operation's result.
*/
private void doExecute(R request, Supplier<CompletionStage<O>> supplier, CompletableFuture<O> future) {
int count = counter.getAndIncrement();
cluster.onRetry(count);
private void doExecute(OutboundInvocation<R> invocation, Supplier<CompletionStage<O>> supplier, CompletableFuture<O> future) {
int count = counter.get();
R request = invocation.getRequest();
if (count > 0) {
invocation.resetOnRetry();
}
cluster.onRetry(request, count);
CompletionStage<O> stage = supplier.get();
stage.whenComplete((v, e) -> {
ServiceError se = v.getError();
ServiceError se = v == null ? null : v.getError();
Throwable throwable = se == null ? e : se.getThrowable();
switch (isRetryable(request, v, e, count)) {
case RETRY:
Throwable unreadyException = cluster.isDestroyed() ? cluster.createException(new RejectUnreadyException(), request) : null;
if (unreadyException != null) {
future.completeExceptionally(unreadyException);
} else {
doExecute(request, supplier, future);
doExecute(invocation, supplier, future);
}
break;
case EXHAUSTED:
Expand All @@ -201,8 +195,8 @@ private void doExecute(R request, Supplier<CompletionStage<O>> supplier, Complet
});
}

private int getCount() {
return counter.get();
private int getAndIncrement() {
return counter.getAndIncrement();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,11 @@ default void onDiscard(R request) {
/**
* Handles retry logic for a process or operation, providing a hook for custom actions on each retry attempt.
*
* @param request The request object that is about to be sent
* @param retries The number of retry attempts that have occurred so far. This count is 1-based, meaning that
* the first retry attempt will pass a value of 1.
*/
default void onRetry(int retries) {
default void onRetry(R request, int retries) {

}

Expand Down Expand Up @@ -236,5 +237,16 @@ default void onSuccess(O response, R request, E endpoint) {

}

/**
* Handles the recovery of a request by delegating to the onSuccess method.
*
* @param response The response object.
* @param request The request object that was recovered.
* @param endpoint The endpoint associated with the request.
*/
default void onRecover(O response, R request, E endpoint) {
onSuccess(response, request, endpoint);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.governance.request;

import com.jd.live.agent.governance.instance.Endpoint;

/**
* An interface representing a request that has already been routed to an appropriate endpoint.
* Extends the {@link Request} interface.
*/
public interface RoutedRequest extends Request {

/**
* Retrieves the endpoint to which the request has been routed.
*
* @param <E> The type of the endpoint.
* @return The selected endpoint for routing the request.
*/
<E extends Endpoint> E getEndpoint();
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package com.jd.live.agent.governance.util;

import com.jd.live.agent.core.util.ExceptionUtils;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

Expand All @@ -35,8 +35,6 @@ public class ResponseUtils {

public static final int HEADER_SIZE_LIMIT = 1024 * 2;

public static final Predicate<Throwable> NONE_EXECUTION_PREDICATE = e -> !(e instanceof ExecutionException) && !(e instanceof InvocationTargetException);

/**
* Converts a Set of Strings to a String, using the specified delimiter and truncating if the resulting String exceeds the maximum length.
*
Expand Down Expand Up @@ -103,7 +101,7 @@ public static void labelHeaders(Throwable e, BiConsumer<String, String> consumer
* @param consumer a BiConsumer to accept the generated headers
*/
public static void labelHeaders(Throwable e, Predicate<Throwable> predicate, BiConsumer<String, String> consumer) {
describe(e, predicate == null ? NONE_EXECUTION_PREDICATE : predicate, (name, message) -> {
describe(e, predicate == null ? ExceptionUtils::isNoneWrapped : predicate, (name, message) -> {
if (name != null && !name.isEmpty()) {
consumer.accept(EXCEPTION_NAMES_LABEL, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

import java.util.concurrent.ThreadLocalRandom;

@GrpcService
public class UserServiceGrpcImpl extends UserServiceGrpc.UserServiceImplBase {

@Override
public void get(UserGetRequest request, StreamObserver<UserGetResponse> responseObserver) {
if (ThreadLocalRandom.current().nextBoolean()) {
responseObserver.onError(new RuntimeException("error"));
return;
}
UserGetResponse.Builder builder = UserGetResponse.newBuilder();
builder.setId(request.getId())
.setName("index :" + request.getId() + " time : " + System.currentTimeMillis())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ spring:
password: ${NACOS_PASSWORD:nacos}
grpc:
server:
port: 9898
port: ${GRPC_SERVER_PORT:${random.int[20000,21000]}}
server:
port: ${SERVER_PORT:9899}
port: ${SERVER_PORT:${random.int[20000,21000]}}
Loading

0 comments on commit 32ef3cf

Please sign in to comment.