Skip to content

Commit

Permalink
feat : reduce retries to 2 and polish junits (#624)
Browse files Browse the repository at this point in the history
* feat : reduce retries to 2 and polish junits

* polish ReadMe

* polish junits

* fix : assertions

* polish
  • Loading branch information
rajadilipkolli authored Jan 10, 2025
1 parent 3805ef1 commit b1628a4
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 158 deletions.
61 changes: 61 additions & 0 deletions kafka-spring-boot/boot-kafka-sample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,64 @@
This sample demonstrates sending message to topic (test_1), after listening to it will send the same message to (test_2).

test_2 topic is configured for validation and if it fails then it will be retried for 3 times and moved to deadletter queue using non blocking way

## Kafka Listener Management Endpoint

Exposed endpoint to start and stop kafka listener on demand

### Endpoint URL
`/listeners`

### HTTP Methods
- `GET`: Retrieve the current state of all Kafka listeners
- `POST`: Update the state (start/stop) of a specific Kafka listener

### Request/Response Formats

#### GET
Response Format (application/json):
```json
{
"topic_2_Listener": true,
"topic_2_Listener-retry": true,
"topic_1_Listener": true,
"topic_2_Listener-dlt": true
}
```
#### POST
Request Format (application/json):

```json
{
"containerId": "topic_2_Listener",
"operation": "STOP" // Allowed values: START, STOP
}
```

Response Format: Same as GET response

### Example Usage

#### Get Listeners State

```shell
curl -X GET http://localhost:8080/listeners
```

#### Stop a Listener
```shell
curl -X POST http://localhost:8080/listeners \
-H "Content-Type: application/json" \
-d '{"containerId":"topic_2_Listener","operation":"STOP"}'
```

#### Start a Listener
```shell
curl -X POST http://localhost:8080/listeners \
-H "Content-Type: application/json" \
-d '{"containerId":"topic_2_Listener","operation":"START"}'
```

### Error Responses
- `400 Bad Request`: Invalid operation value or malformed request
- `404 Not Found`: Specified listener container ID not found
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

@Component
public class Initializer implements CommandLineRunner {
public static final String TOPIC_TEST_1 = "test_1";

public static final String TOPIC_TEST_1 = "test_1";
public static final String TOPIC_TEST_2 = "test_2";

private static final String TOPIC_TEST_3 = "test_3";

private final Sender sender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import org.springframework.lang.NonNull;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;

@Configuration
@Configuration(proxyBeanMethods = false)
@EnableKafka
public class KafkaConfig implements KafkaListenerConfigurer {
class KafkaConfig implements KafkaListenerConfigurer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfig.class);

private final LocalValidatorFactoryBean validator;

public KafkaConfig(LocalValidatorFactoryBean validator) {
KafkaConfig(LocalValidatorFactoryBean validator) {
this.validator = validator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
import io.swagger.v3.oas.annotations.servers.Server;
import org.springframework.context.annotation.Configuration;

@Configuration
@OpenAPIDefinition(info = @Info(title = "spring-boot-sample", version = "v1"), servers = @Server(url = "/"))
public class SwaggerConfig {}
@Configuration(proxyBeanMethods = false)
@OpenAPIDefinition(info = @Info(title = "boot-kafka-sample", version = "v1"), servers = @Server(url = "/"))
class SwaggerConfig {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.TopicInfo;
import com.example.springbootkafkasample.service.MessageService;
import com.example.springbootkafkasample.service.sender.Sender;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
Expand All @@ -21,18 +20,25 @@
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageRestController {
class MessageRestController {

private final Sender sender;
private final MessageService messageService;

public MessageRestController(Sender sender, MessageService messageService) {
this.sender = sender;
MessageRestController(MessageService messageService) {
this.messageService = messageService;
}

@Operation(summary = "Get the state of all Kafka listeners")
@ApiResponses(
value = {
@ApiResponse(
responseCode = "200",
description = "Retrieved listeners state successfully",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = Map.class)))
})
@GetMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> getListeners() {
ResponseEntity<Map<String, Boolean>> getListeners() {
return ResponseEntity.ok(messageService.getListenersState());
}

Expand All @@ -48,18 +54,18 @@ public ResponseEntity<Map<String, Boolean>> getListeners() {
@ApiResponse(responseCode = "404", description = "Listener not found", content = @Content)
})
@PostMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> updateListenerState(
ResponseEntity<Map<String, Boolean>> updateListenerState(
@RequestBody @Valid final KafkaListenerRequest kafkaListenerRequest) {
return ResponseEntity.ok(messageService.updateListenerState(kafkaListenerRequest));
}

@PostMapping("/messages")
public void sendMessage(@RequestBody MessageDTO messageDTO) {
this.sender.send(messageDTO);
void sendMessage(@RequestBody MessageDTO messageDTO) {
this.messageService.send(messageDTO);
}

@GetMapping("/topics")
public List<TopicInfo> getTopicsWithPartitionsCount(
List<TopicInfo> getTopicsWithPartitionsCount(
@RequestParam(required = false, defaultValue = "false") boolean showInternalTopics) {
return messageService.getTopicsWithPartitions(showInternalTopics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;

public record KafkaListenerRequest(
@NotBlank(message = "Container ID must not be blank") String containerId,
@NotBlank(message = "Container ID must not be blank")
@Pattern(regexp = "^\\S.*$", message = "Container ID must not start with whitespace")
String containerId,
@NotNull(message = "Operation must not be null") Operation operation) {}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.example.springbootkafkasample.service;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.Operation;
import com.example.springbootkafkasample.dto.TopicInfo;
import com.example.springbootkafkasample.service.sender.Sender;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand All @@ -25,10 +27,13 @@ public class MessageService {

private final KafkaAdmin kafkaAdmin;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
private final Sender sender;

public MessageService(KafkaAdmin kafkaAdmin, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
public MessageService(
KafkaAdmin kafkaAdmin, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, Sender sender) {
this.kafkaAdmin = kafkaAdmin;
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
this.sender = sender;
}

public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
Expand Down Expand Up @@ -89,4 +94,8 @@ public Map<String, Boolean> updateListenerState(KafkaListenerRequest kafkaListen
}
return getListenersState();
}

public void send(MessageDTO messageDTO) {
sender.send(messageDTO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public class Receiver1 {

private static final Logger logger = LoggerFactory.getLogger(Receiver1.class);

@KafkaListener(topics = TOPIC_TEST_1, groupId = "foo", errorHandler = "validationErrorHandler")
@KafkaListener(
id = "topic_1_Listener",
topics = TOPIC_TEST_1,
groupId = "foo",
errorHandler = "validationErrorHandler")
@SendTo(TOPIC_TEST_2)
public MessageDTO listen(@Header(KafkaHeaders.RECEIVED_KEY) UUID key, ConsumerRecord<String, MessageDTO> cr) {
logger.info("Received message : {} with Key :{} in topic :{}", cr.toString(), key, cr.topic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public CountDownLatch getDeadLetterLatch() {
}

@RetryableTopic(
attempts = "3",
attempts = "2",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
exclude = {MethodArgumentNotValidException.class},
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = TOPIC_TEST_2, groupId = "foo")
@KafkaListener(id = "topic_2_Listener", topics = TOPIC_TEST_2, groupId = "foo")
public void listenTopic2(@Payload @Valid MessageDTO messageDTO, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
logger.info("Received message : {} in topic :{}", messageDTO.toString(), topic);
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@

spring.kafka.consumer.group-id=foo
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.UUIDDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.springbootkafkasample.dto
spring.kafka.listener.ack-mode=RECORD


spring.kafka.producer.properties.max.in.flight.requests.per.connection=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.UUIDSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.service.listener.Receiver2;
import java.util.UUID;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -17,7 +14,6 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
Expand All @@ -38,12 +34,6 @@ class KafkaSampleApplicationTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

@Autowired
private KafkaTemplate<UUID, MessageDTO> template;

@Autowired
private Receiver2 receiver2;

@Autowired
private MockMvc mockMvc;

Expand All @@ -70,12 +60,31 @@ void topicsWithPartitionsCount() throws Exception {
String expectedJson =
"""
[
{"topicName":"test_2-dlt","partitionCount":1,"replicationCount":1},
{"topicName":"test_3","partitionCount":2,"replicationCount":1},
{"topicName":"test_2","partitionCount":2,"replicationCount":1},
{"topicName":"test_1","partitionCount":2,"replicationCount":1},
{"topicName":"test_2-retry-0","partitionCount":1,"replicationCount":1},
{"topicName":"test_2-retry-1","partitionCount":1,"replicationCount":1}
{
"topicName": "test_1",
"partitionCount": 2,
"replicationCount": 1
},
{
"topicName": "test_2",
"partitionCount": 2,
"replicationCount": 1
},
{
"topicName": "test_2-dlt",
"partitionCount": 1,
"replicationCount": 1
},
{
"topicName": "test_2-retry",
"partitionCount": 1,
"replicationCount": 1
},
{
"topicName": "test_3",
"partitionCount": 2,
"replicationCount": 1
}
]
""";
this.mockMvc
Expand Down
Loading

0 comments on commit b1628a4

Please sign in to comment.