From 594961a3bc2d2034fb67eebf9b5f7856bdccd7ad Mon Sep 17 00:00:00 2001 From: Shubhendu Madhukar Date: Thu, 25 Mar 2021 10:21:35 +0530 Subject: [PATCH 1/2] Format README --- README.md | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 11131a9..251a1bb 100644 --- a/README.md +++ b/README.md @@ -18,29 +18,38 @@ https://github.com/fauxauldrich/kafka-service/wiki ## Running the application locally **Run as a Docker container** + Fastest way to get started is by using the image available on [Docker Hub](https://hub.docker.com/r/shubhendumadhukar/kafka-service): - Use the `docker-compose.yml` to bring up your container. (Modify `docker-compose.yml` to reflect the dir path for your truststore files under volumes and update environment variables accordingly) - Or alternatively, to build image locally use Dockerfile provided. -- Build with : `docker build -t kafka-service:${VERSION} .` -- Run as a container using: `docker run -d -env spring.kafka.bootstrap-servers=127.0.0.1:9092 -env kafkaservice.truststore.location=/home/truststore.jks -env kafkaservice.truststore.password=Password@123 --name kafka-service -p 8080:8080 -v /dir/containing/truststore.jks/files:/app/certs kafka-service:${VERSION}` - -There are several other ways to run a Spring Boot application on your local machine. One way is to execute the `main` method in the `com.fauxauldrich.kafkaservice.KafkaServiceApplication` class from your IDE. + - Build with : `docker build -t kafka-service:${VERSION} .` + - Run as a container using: + ````shell + docker run -d -env spring.kafka.bootstrap-servers=127.0.0.1:9092 -env kafkaservice.truststore.location=/app/certs/truststore.jks -env kafkaservice.truststore.password=Password@123 --name kafka-service -p 8080:8080 -v /dir/containing/truststore.jks/files:/app/certs kafka-service:${VERSION}``` + ```` -Alternatively you can use the [Spring Boot Maven plugin](https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) like so: +**Run from your IDE** -```shell -mvn spring-boot:run -``` +- Execute the `main` method in the `com.fauxauldrich.kafkaservice.KafkaServiceApplication` class from your IDE. +- Or you can use the [Spring Boot Maven plugin](https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) like so: + ```shell + mvn spring-boot:run + ``` -You can also download the JAR file from [Releases](https://github.com/fauxauldrich/kafka-service/releases) and run it locally: +**Run as a JAR** -- `java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}.jar` +- You can also download the JAR file from [Releases](https://github.com/fauxauldrich/kafka-service/releases) and run it locally: + ```shell + java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}.jar + ``` +- Or, you can build locally and run the jar file -Or, you can build locally and run the jar file + ```shell + ./mvnw clean install package -f pom.xml -- `./mvnw clean install package -f pom.xml` -- `java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}.jar` + java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}.jar + ``` ## Copyright From 70fbbd9779551c545e897c405e6369cc93765109 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Thu, 1 Apr 2021 11:22:27 +0000 Subject: [PATCH 2/2] - Allow Handlebars in key - Previously only supported in message - Add prometheus scraping endpoint - /actuator/prometheus - Add custom counters and timers - prometheus monitoring --- README.md | 4 +- pom.xml | 14 ++++++- .../resource/DynamicProducerResource.java | 24 +++++++---- .../resource/ProducerResource.java | 42 ++++++++++++------- src/main/resources/application.properties | 2 +- 5 files changed, 60 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 251a1bb..89c162a 100644 --- a/README.md +++ b/README.md @@ -41,14 +41,14 @@ Fastest way to get started is by using the image available on [Docker Hub](https - You can also download the JAR file from [Releases](https://github.com/fauxauldrich/kafka-service/releases) and run it locally: ```shell - java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}.jar + java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}-java11.jar ``` - Or, you can build locally and run the jar file ```shell ./mvnw clean install package -f pom.xml - java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}.jar + java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}-java11.jar ``` ## Copyright diff --git a/pom.xml b/pom.xml index 4418fc4..99f7197 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ com.fauxauldrich kafka-service - 0.0.1 + 0.0.1-java11 jar kafka-service - Demo project for Spring Boot + HTTP Service for Testing Kafka Cluster 11 @@ -54,6 +54,16 @@ guava 29.0-jre + + io.micrometer + micrometer-core + 1.6.0 + + + io.micrometer + micrometer-registry-prometheus + 1.6.0 + diff --git a/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java b/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java index 38ad6b4..d70abfa 100644 --- a/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java +++ b/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java @@ -15,6 +15,8 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import io.micrometer.core.annotation.Timed; + @RestController @RequestMapping("/kafka/dynamic") public class DynamicProducerResource { @@ -26,54 +28,62 @@ public class DynamicProducerResource { private Handlebars handebarsConfig; @PostMapping("/publish") + @Timed(description = "publish_dynamic") public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModel payload) throws IOException { Template template = handebarsConfig.compileInline(payload.getMessage()); + Template keyTemplate = handebarsConfig.compileInline(KEY); - producerService.publish(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID); + producerService.publish(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""), PARTITION_ID); return "Published successfully"; } @PostMapping("/publish/with-headers") + @Timed(description = "publish_dynamic_with_headers") public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModelWithHeaders payload) throws IOException { Template template = handebarsConfig.compileInline(payload.getMessage()); + Template keyTemplate = handebarsConfig.compileInline(KEY); - producerService.publishWithHeaders(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID, - payload.getHeaders()); + producerService.publishWithHeaders(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""), + PARTITION_ID, payload.getHeaders()); return "Published successfully"; } @PostMapping("/publish/secure") + @Timed(description = "publish_dynamic_secure") public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam() String TRUSTSTORE_LOCATION, @RequestParam() String TRUSTSTORE_PASSWORD, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModel payload) throws IOException { Template template = handebarsConfig.compileInline(payload.getMessage()); + Template keyTemplate = handebarsConfig.compileInline(KEY); - producerService.publishSecure(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID, - TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD); + producerService.publishSecure(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""), + PARTITION_ID, TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD); return "Published successfully"; } @PostMapping("/publish/secure/with-headers") + @Timed(description = "publish_dynamic_secure_with_headers") public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam() String TRUSTSTORE_LOCATION, @RequestParam() String TRUSTSTORE_PASSWORD, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModelWithHeaders payload) throws IOException { Template template = handebarsConfig.compileInline(payload.getMessage()); + Template keyTemplate = handebarsConfig.compileInline(KEY); - producerService.publishSecureWithHeaders(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID, - TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD, payload.getHeaders()); + producerService.publishSecureWithHeaders(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""), + PARTITION_ID, TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD, payload.getHeaders()); return "Published successfully"; } diff --git a/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java b/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java index 5cf94f2..99b918b 100644 --- a/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java +++ b/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java @@ -18,6 +18,8 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import io.micrometer.core.annotation.Timed; + @RestController @RequestMapping("/kafka") public class ProducerResource { @@ -31,15 +33,17 @@ public class ProducerResource { private Handlebars handebarsConfig; @PostMapping("/publish") + @Timed(description = "publish_simple") public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModel payload) throws IOException { Template template = handebarsConfig.compileInline(payload.getMessage()); if (KEY != null) { + Template keyTemplate = handebarsConfig.compileInline(KEY); if (PARTITION_ID != null) { - kafkaTemplate.send(TOPIC, PARTITION_ID, KEY, template.apply("")); + kafkaTemplate.send(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply("")); } - kafkaTemplate.send(TOPIC, KEY, template.apply("")); + kafkaTemplate.send(TOPIC, keyTemplate.apply(""), template.apply("")); } else { kafkaTemplate.send(TOPIC, template.apply("")); } @@ -47,6 +51,7 @@ public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(requ } @PostMapping("/publish/with-headers") + @Timed(description = "publish_with_headers") public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModelWithHeaders payload) throws IOException { @@ -55,17 +60,20 @@ public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @Reques ProducerRecord record; if (KEY != null) { + Template keyTemplate = handebarsConfig.compileInline(KEY); if (PARTITION_ID != null) { - record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply("")); + record = new ProducerRecord<>(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply("")); } else { - record = new ProducerRecord<>(TOPIC, KEY, template.apply("")); + record = new ProducerRecord<>(TOPIC, keyTemplate.apply(""), template.apply("")); } } else { - record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply("")); + record = new ProducerRecord<>(TOPIC, template.apply("")); } - for (Map.Entry entry : payload.getHeaders().entrySet()) - record.headers().add(new RecordHeader(entry.getKey(), entry.getValue().getBytes())); + for (Map.Entry entry : payload.getHeaders().entrySet()) { + Template headerTemplate = handebarsConfig.compileInline(entry.getValue()); + record.headers().add(new RecordHeader(entry.getKey(), (headerTemplate.apply("")).getBytes())); + } kafkaTemplate.send(record); @@ -73,15 +81,17 @@ record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply("")); } @PostMapping("/publish/secure") + @Timed(description = "publish_secure") public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModel payload) throws IOException { Template template = handebarsConfig.compileInline(payload.getMessage()); if (KEY != null) { + Template keyTemplate = handebarsConfig.compileInline(KEY); if (PARTITION_ID != null) { - kafkaSecureTemplate.send(TOPIC, PARTITION_ID, KEY, template.apply("")); + kafkaSecureTemplate.send(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply("")); } - kafkaSecureTemplate.send(TOPIC, KEY, template.apply("")); + kafkaSecureTemplate.send(TOPIC, keyTemplate.apply(""), template.apply("")); } else { kafkaSecureTemplate.send(TOPIC, template.apply("")); } @@ -89,6 +99,7 @@ public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam(requ } @PostMapping("/publish/secure/with-headers") + @Timed(description = "publish_secure_with_headers") public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY, @RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModelWithHeaders payload) throws IOException { @@ -97,17 +108,20 @@ public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @Reques ProducerRecord record; if (KEY != null) { + Template keyTemplate = handebarsConfig.compileInline(KEY); if (PARTITION_ID != null) { - record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply("")); + record = new ProducerRecord<>(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply("")); } else { - record = new ProducerRecord<>(TOPIC, KEY, template.apply("")); + record = new ProducerRecord<>(TOPIC, keyTemplate.apply(""), template.apply("")); } } else { - record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply("")); + record = new ProducerRecord<>(TOPIC, template.apply("")); } - for (Map.Entry entry : payload.getHeaders().entrySet()) - record.headers().add(new RecordHeader(entry.getKey(), entry.getValue().getBytes())); + for (Map.Entry entry : payload.getHeaders().entrySet()) { + Template headerTemplate = handebarsConfig.compileInline(entry.getValue()); + record.headers().add(new RecordHeader(entry.getKey(), (headerTemplate.apply("")).getBytes())); + } kafkaSecureTemplate.send(record); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index eb78af9..3433e0c 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,5 +1,5 @@ spring.kafka.bootstrap-servers=127.0.0.1:9092 springdoc.swagger-ui.path=/swagger-ui -management.endpoints.web.exposure.include=health,info,metrics,env +management.endpoints.web.exposure.include=health,info,metrics,env,prometheus kafkaservice.truststore.location=/home/truststore.jks kafkaservice.truststore.password=Password@123