Skip to content

Commit

Permalink
updates with sequence diagram
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Dec 26, 2024
1 parent 20f3051 commit 3c785e5
Showing 1 changed file with 75 additions and 0 deletions.
75 changes: 75 additions & 0 deletions kafka-spring-boot/boot-concurrent-kafka-consumer/ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,81 @@ This project demonstrates a multi-threaded Kafka consumer approach using Spring

### Sequence Diagram

```mermaid
sequenceDiagram
participant C as ConcurrentKafkaConsumer
participant A as AppListener
participant P as ToUpperStringProcessor
C->>C: Spring Boot initializes application
C->>A: Kafka messages arrive, triggering onMessage()
A->>A: Retrieve or initialize ThreadLocal<ToUpperStringProcessor>
A->>P: processString(value)
P->>P: Converts to uppercase & enqueues result
Note over A,P: Additional messages handled similarly
A->>A: onEvent(ConsumerStoppedEvent) -> Remove ThreadLocal processor
```

## Project Structure

### 1. `ConcurrentKafkaConsumer`
- **Location**: `com.example.kafka`
- **Description**:
- Serves as the Spring Boot application entry point.
- Annotated with `@SpringBootApplication`.
- The `main` method runs the Spring application using `SpringApplication.run`.

### 2. `AppListener`
- **Location**: `com.example.kafka.listener`
- **Description**:
- A `@Component` that subscribes to a Kafka topic (referenced by `SPRING_KAFKA_TEST_TOPIC`).
- Uses a `ThreadLocal<ToUpperStringProcessor>` to manage message processing on a per-thread basis.
- Processes messages in the `onMessage` method and handles resource cleanup in `onEvent` when the consumer stops.

### 3. `ToUpperStringProcessor`
- **Location**: `com.example.kafka.processor`
- **Description**:
- A `@Component` with prototype scope, ensuring each instance is unique (e.g., one per thread).
- Transforms incoming strings to uppercase and stores them in a `BlockingQueue`.
- Offers methods like `distinctQueuedData()` to retrieve unique processed values.

## Message Processing Sequence

1. **Startup and Configuration**
- The `ConcurrentKafkaConsumer` class launches the Spring Boot application.
- Kafka consumer properties are configured in `application.properties`, including concurrency level and bootstrap servers.

2. **Receiving Messages**
- The `AppListener` class is annotated with `@KafkaListener`, making it an endpoint for Kafka messages.
- When a message arrives, `onMessage` is triggered, and a `ToUpperStringProcessor` is retrieved or created for the current thread.

3. **Processing**
- The processor converts the message to uppercase and stores it in a thread-safe queue.
- Each thread maintains its own `ToUpperStringProcessor`, eliminating conflicts between parallel consumers.

4. **Resource Cleanup**
- When a consumer stops, `AppListener`’s `onEvent` method removes the processor from its `ThreadLocal`, preventing resource leaks.

## Running the Application

1. **Build and Start**

```
./mvnw clean package
./mvnw spring-boot:run
```
2. **Send Kafka Messages**
Send messages to the topic defined in `application.properties` (default is `SPRING_KAFKA_TEST_TOPIC`).
3. **Observe Logs**
Each thread logs its own unique processor ID and message processing. Check the console or log files for details.
## Key Advantages
- **Increased Throughput**: Multiple threads can consume and process messages in parallel.
- **Isolation**: Each thread has a dedicated processor, avoiding synchronization overload.
- **Scalability**: The concurrency level can be fine-tuned via application properties.
### Format code
Expand Down

0 comments on commit 3c785e5

Please sign in to comment.