Skip to content

Commit

Permalink
Merge pull request #8 from fauxauldrich/develop
Browse files Browse the repository at this point in the history
Merge Develop
  • Loading branch information
shubhendumadhukar authored Apr 1, 2021
2 parents eaed9c6 + 70fbbd9 commit 01d3454
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 37 deletions.
35 changes: 22 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,38 @@ https://github.com/fauxauldrich/kafka-service/wiki
## Running the application locally

**Run as a Docker container**

Fastest way to get started is by using the image available on [Docker Hub](https://hub.docker.com/r/shubhendumadhukar/kafka-service):

- Use the `docker-compose.yml` to bring up your container. (Modify `docker-compose.yml` to reflect the dir path for your truststore files under volumes and update environment variables accordingly)
- Or alternatively, to build image locally use Dockerfile provided.
- Build with : `docker build -t kafka-service:${VERSION} .`
- Run as a container using: `docker run -d -env spring.kafka.bootstrap-servers=127.0.0.1:9092 -env kafkaservice.truststore.location=/home/truststore.jks -env kafkaservice.truststore.password=Password@123 --name kafka-service -p 8080:8080 -v /dir/containing/truststore.jks/files:/app/certs kafka-service:${VERSION}`

There are several other ways to run a Spring Boot application on your local machine. One way is to execute the `main` method in the `com.fauxauldrich.kafkaservice.KafkaServiceApplication` class from your IDE.
- Build with : `docker build -t kafka-service:${VERSION} .`
- Run as a container using:
````shell
docker run -d -env spring.kafka.bootstrap-servers=127.0.0.1:9092 -env kafkaservice.truststore.location=/app/certs/truststore.jks -env kafkaservice.truststore.password=Password@123 --name kafka-service -p 8080:8080 -v /dir/containing/truststore.jks/files:/app/certs kafka-service:${VERSION}```
````
Alternatively you can use the [Spring Boot Maven plugin](https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) like so:
**Run from your IDE**
```shell
mvn spring-boot:run
```
- Execute the `main` method in the `com.fauxauldrich.kafkaservice.KafkaServiceApplication` class from your IDE.
- Or you can use the [Spring Boot Maven plugin](https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) like so:
```shell
mvn spring-boot:run
```

You can also download the JAR file from [Releases](https://github.com/fauxauldrich/kafka-service/releases) and run it locally:
**Run as a JAR**

- `java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}.jar`
- You can also download the JAR file from [Releases](https://github.com/fauxauldrich/kafka-service/releases) and run it locally:
```shell
java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}-java11.jar
```
- Or, you can build locally and run the jar file

Or, you can build locally and run the jar file
```shell
./mvnw clean install package -f pom.xml
- `./mvnw clean install package -f pom.xml`
- `java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}.jar`
java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}-java11.jar
```

## Copyright

Expand Down
14 changes: 12 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
</parent>
<groupId>com.fauxauldrich</groupId>
<artifactId>kafka-service</artifactId>
<version>0.0.1</version>
<version>0.0.1-java11</version>
<packaging>jar</packaging>
<name>kafka-service</name>
<description>Demo project for Spring Boot</description>
<description>HTTP Service for Testing Kafka Cluster</description>
<properties>
<java.version>11</java.version>
</properties>
Expand Down Expand Up @@ -54,6 +54,16 @@
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import io.micrometer.core.annotation.Timed;

@RestController
@RequestMapping("/kafka/dynamic")
public class DynamicProducerResource {
Expand All @@ -26,54 +28,62 @@ public class DynamicProducerResource {
private Handlebars handebarsConfig;

@PostMapping("/publish")
@Timed(description = "publish_dynamic")
public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModel payload)
throws IOException {

Template template = handebarsConfig.compileInline(payload.getMessage());
Template keyTemplate = handebarsConfig.compileInline(KEY);

producerService.publish(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID);
producerService.publish(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""), PARTITION_ID);

return "Published successfully";
}

@PostMapping("/publish/with-headers")
@Timed(description = "publish_dynamic_with_headers")
public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModelWithHeaders payload)
throws IOException {

Template template = handebarsConfig.compileInline(payload.getMessage());
Template keyTemplate = handebarsConfig.compileInline(KEY);

producerService.publishWithHeaders(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID,
payload.getHeaders());
producerService.publishWithHeaders(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""),
PARTITION_ID, payload.getHeaders());

return "Published successfully";
}

@PostMapping("/publish/secure")
@Timed(description = "publish_dynamic_secure")
public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam() String TRUSTSTORE_LOCATION,
@RequestParam() String TRUSTSTORE_PASSWORD, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModel payload)
throws IOException {

Template template = handebarsConfig.compileInline(payload.getMessage());
Template keyTemplate = handebarsConfig.compileInline(KEY);

producerService.publishSecure(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID,
TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD);
producerService.publishSecure(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""),
PARTITION_ID, TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD);

return "Published successfully";
}

@PostMapping("/publish/secure/with-headers")
@Timed(description = "publish_dynamic_secure_with_headers")
public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam() String TRUSTSTORE_LOCATION,
@RequestParam() String TRUSTSTORE_PASSWORD, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModelWithHeaders payload)
throws IOException {

Template template = handebarsConfig.compileInline(payload.getMessage());
Template keyTemplate = handebarsConfig.compileInline(KEY);

producerService.publishSecureWithHeaders(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID,
TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD, payload.getHeaders());
producerService.publishSecureWithHeaders(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""),
PARTITION_ID, TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD, payload.getHeaders());

return "Published successfully";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import io.micrometer.core.annotation.Timed;

@RestController
@RequestMapping("/kafka")
public class ProducerResource {
Expand All @@ -31,22 +33,25 @@ public class ProducerResource {
private Handlebars handebarsConfig;

@PostMapping("/publish")
@Timed(description = "publish_simple")
public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModel payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
if (KEY != null) {
Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
kafkaTemplate.send(TOPIC, PARTITION_ID, KEY, template.apply(""));
kafkaTemplate.send(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
}
kafkaTemplate.send(TOPIC, KEY, template.apply(""));
kafkaTemplate.send(TOPIC, keyTemplate.apply(""), template.apply(""));
} else {
kafkaTemplate.send(TOPIC, template.apply(""));
}
return "Published successfully";
}

@PostMapping("/publish/with-headers")
@Timed(description = "publish_with_headers")
public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModelWithHeaders payload)
throws IOException {
Expand All @@ -55,40 +60,46 @@ public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @Reques
ProducerRecord<String, Object> record;

if (KEY != null) {
Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
record = new ProducerRecord<>(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
} else {
record = new ProducerRecord<>(TOPIC, KEY, template.apply(""));
record = new ProducerRecord<>(TOPIC, keyTemplate.apply(""), template.apply(""));
}
} else {
record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
record = new ProducerRecord<>(TOPIC, template.apply(""));
}

for (Map.Entry<String, String> entry : payload.getHeaders().entrySet())
record.headers().add(new RecordHeader(entry.getKey(), entry.getValue().getBytes()));
for (Map.Entry<String, String> entry : payload.getHeaders().entrySet()) {
Template headerTemplate = handebarsConfig.compileInline(entry.getValue());
record.headers().add(new RecordHeader(entry.getKey(), (headerTemplate.apply("")).getBytes()));
}

kafkaTemplate.send(record);

return "Published successfully";
}

@PostMapping("/publish/secure")
@Timed(description = "publish_secure")
public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModel payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
if (KEY != null) {
Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
kafkaSecureTemplate.send(TOPIC, PARTITION_ID, KEY, template.apply(""));
kafkaSecureTemplate.send(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
}
kafkaSecureTemplate.send(TOPIC, KEY, template.apply(""));
kafkaSecureTemplate.send(TOPIC, keyTemplate.apply(""), template.apply(""));
} else {
kafkaSecureTemplate.send(TOPIC, template.apply(""));
}
return "Published successfully";
}

@PostMapping("/publish/secure/with-headers")
@Timed(description = "publish_secure_with_headers")
public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModelWithHeaders payload)
throws IOException {
Expand All @@ -97,17 +108,20 @@ public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @Reques
ProducerRecord<String, Object> record;

if (KEY != null) {
Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
record = new ProducerRecord<>(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
} else {
record = new ProducerRecord<>(TOPIC, KEY, template.apply(""));
record = new ProducerRecord<>(TOPIC, keyTemplate.apply(""), template.apply(""));
}
} else {
record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
record = new ProducerRecord<>(TOPIC, template.apply(""));
}

for (Map.Entry<String, String> entry : payload.getHeaders().entrySet())
record.headers().add(new RecordHeader(entry.getKey(), entry.getValue().getBytes()));
for (Map.Entry<String, String> entry : payload.getHeaders().entrySet()) {
Template headerTemplate = handebarsConfig.compileInline(entry.getValue());
record.headers().add(new RecordHeader(entry.getKey(), (headerTemplate.apply("")).getBytes()));
}

kafkaSecureTemplate.send(record);

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
spring.kafka.bootstrap-servers=127.0.0.1:9092
springdoc.swagger-ui.path=/swagger-ui
management.endpoints.web.exposure.include=health,info,metrics,env
management.endpoints.web.exposure.include=health,info,metrics,env,prometheus
kafkaservice.truststore.location=/home/truststore.jks
kafkaservice.truststore.password=Password@123

0 comments on commit 01d3454

Please sign in to comment.