Skip to content

Commit

Permalink
GH-249 Code Review comments addressed. Added Test cases with RetryAdv…
Browse files Browse the repository at this point in the history
…ice.
  • Loading branch information
siddharthjain210 committed Dec 29, 2024
1 parent 4d4fcf8 commit 186314e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.KPLBackpressureException;
import org.springframework.integration.aws.support.KplBackpressureException;
import org.springframework.integration.aws.support.UserRecordResponse;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageHandler;
Expand All @@ -67,6 +67,8 @@
* The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer
* Library {@code putRecord(s)}.
*
* @exception KplBackpressureException When backpressure handling is enabled and buffer is at max capacity.
*
* @author Arnaud Lecollaire
* @author Artem Bilan
* @author Siddharth Jain
Expand Down Expand Up @@ -121,13 +123,16 @@ public void setConverter(Converter<Object, byte[]> converter) {

/**
* Configure maximum records in flight for handling backpressure. By Default, backpressure handling is not enabled.
* On number of records in flight exceeding the threshold, {@link KPLBackpressureException} would be thrown.
* If Backpressure handling is enabled, {@link KPLBackpressureException} must be handled.
* On number of records in flight exceeding the threshold, {@link KplBackpressureException} would be thrown.
* If Backpressure handling is enabled, {@link KplBackpressureException} must be handled.
*
* @param backPressureThreshold Defaulted to 0. Set a value greater than 0 to enable backpressure handling.
*
* @since 3.0.9
*/
public void setBackPressureThreshold(long backPressureThreshold) {
Assert.isTrue(backPressureThreshold > 0, "'backPressureThreshold must be greater than 0.");
Assert.isTrue(backPressureThreshold >= 0,
"'backPressureThreshold must be greater than equal to 0.");
this.backPressureThreshold = backPressureThreshold;
}

Expand Down Expand Up @@ -383,16 +388,12 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
}
}

private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord)
throws KPLBackpressureException {

private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
if (this.backPressureThreshold > 0) {
var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount();
if (numberOfRecordsInFlight > this.backPressureThreshold) {
logger.error(String.format("Backpressure handling is enabled, Number of records in flight: %s is " +
"greater than backpressure threshold: %s" +
".", numberOfRecordsInFlight, this.backPressureThreshold));
throw new KPLBackpressureException("Buffer already at max capacity.");
throw new KplBackpressureException("Cannot send record to kinesis since buffer is at max capacity.",
userRecord);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,32 @@

package org.springframework.integration.aws.support;

import org.springframework.messaging.MessagingException;
import com.amazonaws.services.kinesis.producer.UserRecord;

/**
* An exception triggered from {@link org.springframework.integration.aws.outbound.KplMessageHandler} while sending
* records to kinesis when maximum number of records in flight exceeds the backpressure threshold.
*
* @author Siddharth Jain
*
* @since 3.0.9
*/
public class KPLBackpressureException extends MessagingException {
public class KplBackpressureException extends RuntimeException {

private static final long serialVersionUID = 1L;

public KPLBackpressureException(String message) {
private final UserRecord userRecord;

public KplBackpressureException(String message, UserRecord userRecord) {
super(message);
this.userRecord = userRecord;
}

/**
* Get the {@link UserRecord} related.
* @return {@link UserRecord} linked while sending the record to kinesis.
*/
public UserRecord getUserRecord() {
return this.userRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,30 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.KPLBackpressureException;
import org.springframework.integration.aws.support.KplBackpressureException;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

/**
/** The class contains test cases for KplMessageHandler.
*
* @author Siddharth Jain
*
* @since 3.0.9
*/
@SpringJUnitConfig
Expand All @@ -63,42 +69,42 @@ public class KplMessageHandlerTests {

@Test
@SuppressWarnings("unchecked")
void testKPLMessageHandler_raw_payload_success() {
void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock());
final Message<?> message = MessageBuilder
.withPayload("message1")
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
.withPayload("someMessage")
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("foo", "bar")
.setHeader("someHeaderKey", "someHeaderValue")
.build();


ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
.forClass(UserRecord.class);

this.kplMessageHandler.setBackPressureThreshold(0);
this.kinesisSendChannel.send(message);
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
verify(this.kinesisProducer, Mockito.times(0)).getOutstandingRecordsCount();
verify(this.kinesisProducer, Mockito.never()).getOutstandingRecordsCount();
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
assertThat(userRecord.getStreamName()).isEqualTo("foo");
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey");
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
assertThat(userRecord.getExplicitHashKey()).isNull();
}

@Test
@SuppressWarnings("unchecked")
void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() {
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock());
this.kplMessageHandler.setBackPressureThreshold(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
.willReturn(1);
final Message<?> message = MessageBuilder
.withPayload("message1")
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
.withPayload("someMessage")
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("foo", "bar")
.setHeader("someHeaderKey", "someHeaderValue")
.build();


Expand All @@ -107,40 +113,36 @@ void testKPLMessageHandler_raw_payload_backpressure_capacity_available_test() {

this.kinesisSendChannel.send(message);
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount();
verify(this.kinesisProducer).getOutstandingRecordsCount();
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
assertThat(userRecord.getStreamName()).isEqualTo("foo");
assertThat(userRecord.getPartitionKey()).isEqualTo("fooKey");
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
assertThat(userRecord.getExplicitHashKey()).isNull();
}

@Test
@SuppressWarnings("unchecked")
void testKPLMessageHandler_raw_payload_backpressure_insufficient_capacity_test() {
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() {
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
.willReturn(mock());
this.kplMessageHandler.setBackPressureThreshold(2);
given(this.kinesisProducer.getOutstandingRecordsCount())
.willReturn(5);
final Message<?> message = MessageBuilder
.withPayload("message1")
.setHeader(AwsHeaders.PARTITION_KEY, "fooKey")
.withPayload("someMessage")
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
.setHeader("foo", "bar")
.setHeader("someHeaderKey", "someHeaderValue")
.build();

try {
this.kinesisSendChannel.send(message);
}
catch (Exception ex) {
assertThat(ex).isNotNull();
assertThat(ex.getCause()).isNotNull();
assertThat(ex.getCause().getClass()).isEqualTo(KPLBackpressureException.class);
assertThat(ex.getCause().getMessage()).isEqualTo("Buffer already at max capacity.");
}
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> this.kinesisSendChannel.send(message))
.withCauseInstanceOf(MessageHandlingException.class)
.withRootCauseExactlyInstanceOf(KplBackpressureException.class)
.withStackTraceContaining("Cannot send record to kinesis since buffer is at max capacity.");

verify(this.kinesisProducer, Mockito.times(0)).addUserRecord(any(UserRecord.class));
verify(this.kinesisProducer, Mockito.times(1)).getOutstandingRecordsCount();
verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class));
verify(this.kinesisProducer).getOutstandingRecordsCount();
}

@AfterEach
Expand All @@ -158,13 +160,25 @@ public KinesisProducer kinesisProducer() {
}

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder()
.retryOn(KplBackpressureException.class)
.exponentialBackoff(100, 2.0, 1000)
.maxAttempts(3)
.build());
return requestHandlerRetryAdvice;
}

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = {"retryAdvice"})
public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
kplMessageHandler.setAsync(true);
kplMessageHandler.setStream("foo");
kplMessageHandler.setStream("someStream");
return kplMessageHandler;
}

}

}

0 comments on commit 186314e

Please sign in to comment.