From e837ae4189595036398041b7ed298bd8ee22d8ac Mon Sep 17 00:00:00 2001 From: Gustavo Lira Date: Thu, 12 Dec 2024 11:44:10 -0300 Subject: [PATCH 1/2] DBZ-3720 Error with debezium.sink.pulsar.client.serviceUrl and debezium-server DBZ-3720 Error with debezium.sink.pulsar.client.serviceUrl and debezium-server --- .../io/debezium/server/pulsar/PulsarChangeConsumer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java index 2a370ffa..68d88c08 100644 --- a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java +++ b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import io.debezium.util.Strings; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.Dependent; @@ -77,8 +78,12 @@ public interface ProducerBuilder { void connect() { final Config config = ConfigProvider.getConfig(); try { + Map pulsarClientConfig = getConfigSubset(config, PROP_CLIENT_PREFIX); + Map camelCaseConfig = new HashMap<>(); + pulsarClientConfig.forEach((key, value) -> camelCaseConfig.put(Strings.convertDotAndUnderscoreStringToCamelCase(key), value)); + pulsarClient = PulsarClient.builder() - .loadConf(getConfigSubset(config, PROP_CLIENT_PREFIX)) + .loadConf(camelCaseConfig) .build(); } catch (PulsarClientException e) { From 43e7a4d1b5602179f2fe32c4c3643ab72cecb263 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 16 Dec 2024 12:06:01 +0100 Subject: [PATCH 2/2] DBZ-3720 Fix imports --- .../java/io/debezium/server/pulsar/PulsarChangeConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java index 68d88c08..1f92e21d 100644 --- a/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java +++ b/debezium-server-pulsar/src/main/java/io/debezium/server/pulsar/PulsarChangeConsumer.java @@ -14,7 +14,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import io.debezium.util.Strings; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import jakarta.enterprise.context.Dependent; @@ -36,6 +35,7 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.RecordCommitter; import io.debezium.server.BaseChangeConsumer; +import io.debezium.util.Strings; /** * Implementation of the consumer that delivers the messages into a Pulsar destination.