Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8299 - Upgrade Google BOM for PubSub #130

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
#
# Before building the container image run:
#
# ./mvnw package
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/code-with-quarkus-jvm .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/code-with-quarkus-jvm
#
# If you want to include the debug port into your docker image
# you will have to expose the debug port (default 5005) like this : EXPOSE 8080 5050
#
# Then run the container using :
#
# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/code-with-quarkus-jvm
#
###
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3

ARG JAVA_PACKAGE=java-21-openjdk-headless
ARG RUN_JAVA_VERSION=1.3.8
ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
# Install java and the run-java script
# Also set up permissions for user `1001`
RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
&& microdnf update \
&& microdnf clean all \
&& mkdir /deployments \
&& mkdir -p /deployments/conf \
&& chown 1001 /deployments \
&& chown 1001 /deployments/conf \
&& chmod "g+rwX" /deployments \
&& chown 1001:root /deployments \
&& curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \
&& chown 1001 /deployments/run-java.sh \
&& chmod 540 /deployments/run-java.sh \
&& echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/lib/security/java.security

# Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size.
ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
COPY --chown=1001 debezium-server-dist/target/*runner.jar /deployments/
COPY --chown=1001 debezium-server-dist/target/lib/ /deployments/lib/
COPY --chown=1001 run.sh /deployments/run.sh

EXPOSE 8080
USER 1001

WORKDIR /deployments

ENTRYPOINT [ "/deployments/run.sh" ]
21 changes: 21 additions & 0 deletions application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=your-project-id
debezium.sink.pubsub.opentelemetry.enabled=true
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.table.include.list=inventory.customers
quarkus.otel.exporter.otlp.endpoint=http://jaeger:4317
# debezium.sink.pubsub.wait.message.delivery.timeout.ms=1000
# debezium.sink.pubsub.retry.total.timeout.ms=2000
quarkus.log.console.json=false
quarkus.log.level=TRACE
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
33 changes: 33 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
exporters:
googlecloud:
project: 'ft-sx-pavlicek-c-1728219053'
log:
default_log_name: opentelemetry.io/collector-exported-log
processors:
memory_limiter:
check_interval: 1s
limit_percentage: 65
spike_limit_percentage: 20
batch:
resourcedetection:
detectors: [gcp]
timeout: 10s
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [googlecloud]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [googlecloud]
logs:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [googlecloud]
2 changes: 1 addition & 1 deletion debezium-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<properties>
<version.kinesis>2.13.13</version.kinesis>
<version.sqs>2.13.13</version.sqs>
<version.pubsub>26.17.0</version.pubsub>
<version.pubsub>26.48.0</version.pubsub>
<version.pulsar>2.10.1</version.pulsar>
<version.eventhubs>5.12.1</version.eventhubs>
<version.pravega>0.13.0</version.pravega>
Expand Down
4 changes: 4 additions & 0 deletions debezium-server-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.debezium.util.Threads;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.OpenTelemetry;

/**
* Implementation of the consumer that delivers the messages into Google Pub/Sub destination.
Expand All @@ -70,6 +71,9 @@ public class PubSubChangeConsumer extends BaseChangeConsumer implements Debezium

private static final Logger LOGGER = LoggerFactory.getLogger(PubSubChangeConsumer.class);

@Inject
OpenTelemetry openTelemetry;

private static final String PROP_PREFIX = "debezium.sink.pubsub.";
private static final String PROP_PROJECT_ID = PROP_PREFIX + "project.id";

Expand All @@ -85,6 +89,9 @@ public interface PublisherBuilder {
@ConfigProperty(name = PROP_PREFIX + "ordering.enabled", defaultValue = "true")
boolean orderingEnabled;

@ConfigProperty(name = PROP_PREFIX + "opentelemetry.enabled", defaultValue = "false")
boolean openTelemetryEnabled;

@ConfigProperty(name = PROP_PREFIX + "ordering.key")
Optional<String> orderingKey;

Expand Down Expand Up @@ -185,6 +192,8 @@ void connect() {
try {
Builder builder = Publisher.newBuilder(t)
.setEnableMessageOrdering(orderingEnabled)
.setEnableOpenTelemetryTracing(openTelemetryEnabled)
.setOpenTelemetry(openTelemetry)
.setBatchingSettings(batchingSettings.build())
.setRetrySettings(
RetrySettings.newBuilder()
Expand All @@ -208,6 +217,7 @@ void connect() {
}
};

LOGGER.debug("Tracing '{}'", openTelemetryEnabled);
LOGGER.info("Using default PublisherBuilder '{}'", publisherBuilder);
}

Expand Down Expand Up @@ -260,11 +270,15 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);
final String topicName = streamNameMapper.map(record.destination());
LOGGER.trace("topicName '{}'", topicName);
LOGGER.trace("projectId '{}'", projectId);
Publisher publisher = publishers.computeIfAbsent(topicName, (x) -> publisherBuilder.get(ProjectTopicName.of(projectId, x)));
LOGGER.trace("Received event '{}'", publisher);

PubsubMessage message = buildPubSubMessage(record);

deliveries.add(publisher.publish(message));
LOGGER.trace("Sent event '{}'", record);
}
List<String> messageIds;
try {
Expand Down
37 changes: 37 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: '3.9'
services:
jaeger:
image: jaegertracing/all-in-one:1.47
ports:
- 4317:4317
- 16686:16686
environment:
- COLLECTOR_OTLP_ENABLED=true
postgres:
image: quay.io/debezium/example-postgres:2.4.0.CR1
container_name: postgres
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
debezium-server:
image: debezium-server:local
build:
context: .
dockerfile: ./Dockerfile
container_name: debezium-server
environment:
GOOGLE_APPLICATION_CREDENTIALS: /deployments/service-account.json
JAVA_APP_JAR: debezium-server-dist-3.0.0-SNAPSHOT-runner.jar
JAVA_LIB_DIR: /deployments/lib
OTEL_TRACES_EXPORTER: none
OTEL_METRICS_EXPORTER: none
OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4317
ports:
- "8080:8080"
- "1976:1976"
- "9090:9090"
volumes:
- ./service-account.json:/deployments/service-account.json
- ./application.properties:/deployments/config/application.properties
54 changes: 54 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash
#
# In order to run Debezium for the Cassandra database, export the CASSANDRA_VERSION environment variable
# with one of the following values: v3, v4, dse.
#
# Copyright Debezium Authors.
#
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
#
LIB_PATH="lib/*"
# Configuration files and directories that need to be on the classpath
LIB_CONFIG_PATH="config/lib"

if [ "$OSTYPE" = "msys" ] || [ "$OSTYPE" = "cygwin" ]; then
PATH_SEP=";"
else
PATH_SEP=":"
fi

if [ -n "$EXTRA_CONNECTOR" ]; then
EXTRA_CONNECTOR=${EXTRA_CONNECTOR,,}
export EXTRA_CONNECTOR_DIR="connectors/debezium-connector-${EXTRA_CONNECTOR}"

echo "Connector - ${EXTRA_CONNECTOR} loaded from ${EXTRA_CONNECTOR_DIR}"

if [ -f "${EXTRA_CONNECTOR_DIR}/jdk_java_options.sh" ]; then
source "${EXTRA_CONNECTOR_DIR}/jdk_java_options.sh"
fi

EXTRA_CLASS_PATH=""
if [ -f "${EXTRA_CONNECTOR_DIR}/extra_class_path.sh" ]; then
source "${EXTRA_CONNECTOR_DIR}/extra_class_path.sh"
LIB_PATH=$EXTRA_CLASS_PATH$LIB_PATH
fi
fi

if [ -z "$JAVA_HOME" ]; then
JAVA_BINARY="java"
else
JAVA_BINARY="$JAVA_HOME/bin/java"
fi

RUNNER=$(ls debezium-server-*runner.jar)

ENABLE_DEBEZIUM_SCRIPTING=${ENABLE_DEBEZIUM_SCRIPTING:-false}
if [[ "${ENABLE_DEBEZIUM_SCRIPTING}" == "true" ]]; then
LIB_PATH=$LIB_PATH$PATH_SEP"lib_opt/*"
fi

source ./jmx/enable_jmx.sh
source ./lib_metrics/enable_exporter.sh
echo "$JAVA_BINARY" $DEBEZIUM_OPTS $JAVA_OPTS -cp $RUNNER$PATH_SEP$LIB_CONFIG_PATH$PATH_SEP$LIB_PATH io.debezium.server.Main
exec "$JAVA_BINARY" $DEBEZIUM_OPTS $JAVA_OPTS -cp \
$RUNNER$PATH_SEP$LIB_CONFIG_PATH$PATH_SEP$LIB_PATH io.debezium.server.Main