diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..6eff11f8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,56 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# Before building the container image run: +# +# ./mvnw package +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/code-with-quarkus-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/code-with-quarkus-jvm +# +# If you want to include the debug port into your docker image +# you will have to expose the debug port (default 5005) like this : EXPOSE 8080 5050 +# +# Then run the container using : +# +# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/code-with-quarkus-jvm +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 + +ARG JAVA_PACKAGE=java-21-openjdk-headless +ARG RUN_JAVA_VERSION=1.3.8 +ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' +# Install java and the run-java script +# Also set up permissions for user `1001` +RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \ + && microdnf update \ + && microdnf clean all \ + && mkdir /deployments \ + && mkdir -p /deployments/conf \ + && chown 1001 /deployments \ + && chown 1001 /deployments/conf \ + && chmod "g+rwX" /deployments \ + && chown 1001:root /deployments \ + && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \ + && chown 1001 /deployments/run-java.sh \ + && chmod 540 /deployments/run-java.sh \ + && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/lib/security/java.security + +# Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size. +ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +COPY --chown=1001 debezium-server-dist/target/*runner.jar /deployments/ +COPY --chown=1001 debezium-server-dist/target/lib/ /deployments/lib/ +COPY --chown=1001 run.sh /deployments/run.sh + +EXPOSE 8080 +USER 1001 + +WORKDIR /deployments + +ENTRYPOINT [ "/deployments/run.sh" ] diff --git a/application.properties b/application.properties new file mode 100644 index 00000000..2b73933e --- /dev/null +++ b/application.properties @@ -0,0 +1,21 @@ +debezium.sink.type=pubsub +debezium.sink.pubsub.project.id=your-project-id +debezium.sink.pubsub.opentelemetry.enabled=true +debezium.transforms=unwrap +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.offset.storage.file.filename=data/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.database.hostname=postgres +debezium.source.database.port=5432 +debezium.source.database.user=postgres +debezium.source.database.password=postgres +debezium.source.database.dbname=postgres +debezium.source.topic.prefix=tutorial +debezium.source.table.include.list=inventory.customers +quarkus.otel.exporter.otlp.endpoint=http://jaeger:4317 +# debezium.sink.pubsub.wait.message.delivery.timeout.ms=1000 +# debezium.sink.pubsub.retry.total.timeout.ms=2000 +quarkus.log.console.json=false +quarkus.log.level=TRACE +transforms.outbox.type=io.debezium.transforms.outbox.EventRouter diff --git a/config.yaml b/config.yaml new file mode 100644 index 00000000..99a72f3e --- /dev/null +++ b/config.yaml @@ -0,0 +1,33 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 +exporters: + googlecloud: + project: 'ft-sx-pavlicek-c-1728219053' + log: + default_log_name: opentelemetry.io/collector-exported-log +processors: + memory_limiter: + check_interval: 1s + limit_percentage: 65 + spike_limit_percentage: 20 + batch: + resourcedetection: + detectors: [gcp] + timeout: 10s +service: + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [googlecloud] + metrics: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [googlecloud] + logs: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [googlecloud] diff --git a/debezium-server-bom/pom.xml b/debezium-server-bom/pom.xml index d798c673..f1c01265 100644 --- a/debezium-server-bom/pom.xml +++ b/debezium-server-bom/pom.xml @@ -14,7 +14,7 @@ 2.13.13 2.13.13 - 26.17.0 + 26.48.0 2.10.1 5.12.1 0.13.0 diff --git a/debezium-server-core/pom.xml b/debezium-server-core/pom.xml index e212963b..b8ba2684 100644 --- a/debezium-server-core/pom.xml +++ b/debezium-server-core/pom.xml @@ -16,6 +16,10 @@ io.quarkus quarkus-core + + + io.quarkus + quarkus-opentelemetry io.debezium diff --git a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index 7dd1a67d..1c80fb32 100644 --- a/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -58,6 +58,7 @@ import io.debezium.util.Threads; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.api.OpenTelemetry; /** * Implementation of the consumer that delivers the messages into Google Pub/Sub destination. @@ -70,6 +71,9 @@ public class PubSubChangeConsumer extends BaseChangeConsumer implements Debezium private static final Logger LOGGER = LoggerFactory.getLogger(PubSubChangeConsumer.class); + @Inject + OpenTelemetry openTelemetry; + private static final String PROP_PREFIX = "debezium.sink.pubsub."; private static final String PROP_PROJECT_ID = PROP_PREFIX + "project.id"; @@ -85,6 +89,9 @@ public interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "ordering.enabled", defaultValue = "true") boolean orderingEnabled; + @ConfigProperty(name = PROP_PREFIX + "opentelemetry.enabled", defaultValue = "false") + boolean openTelemetryEnabled; + @ConfigProperty(name = PROP_PREFIX + "ordering.key") Optional orderingKey; @@ -185,6 +192,8 @@ void connect() { try { Builder builder = Publisher.newBuilder(t) .setEnableMessageOrdering(orderingEnabled) + .setEnableOpenTelemetryTracing(openTelemetryEnabled) + .setOpenTelemetry(openTelemetry) .setBatchingSettings(batchingSettings.build()) .setRetrySettings( RetrySettings.newBuilder() @@ -208,6 +217,7 @@ void connect() { } }; + LOGGER.debug("Tracing '{}'", openTelemetryEnabled); LOGGER.info("Using default PublisherBuilder '{}'", publisherBuilder); } @@ -260,11 +270,15 @@ public void handleBatch(List> records, RecordCommitt for (ChangeEvent record : records) { LOGGER.trace("Received event '{}'", record); final String topicName = streamNameMapper.map(record.destination()); + LOGGER.trace("topicName '{}'", topicName); + LOGGER.trace("projectId '{}'", projectId); Publisher publisher = publishers.computeIfAbsent(topicName, (x) -> publisherBuilder.get(ProjectTopicName.of(projectId, x))); + LOGGER.trace("Received event '{}'", publisher); PubsubMessage message = buildPubSubMessage(record); deliveries.add(publisher.publish(message)); + LOGGER.trace("Sent event '{}'", record); } List messageIds; try { diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..75450ab3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,37 @@ +version: '3.9' +services: + jaeger: + image: jaegertracing/all-in-one:1.47 + ports: + - 4317:4317 + - 16686:16686 + environment: + - COLLECTOR_OTLP_ENABLED=true + postgres: + image: quay.io/debezium/example-postgres:2.4.0.CR1 + container_name: postgres + ports: + - 5432:5432 + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + debezium-server: + image: debezium-server:local + build: + context: . + dockerfile: ./Dockerfile + container_name: debezium-server + environment: + GOOGLE_APPLICATION_CREDENTIALS: /deployments/service-account.json + JAVA_APP_JAR: debezium-server-dist-3.0.0-SNAPSHOT-runner.jar + JAVA_LIB_DIR: /deployments/lib + OTEL_TRACES_EXPORTER: none + OTEL_METRICS_EXPORTER: none + OTEL_EXPORTER_OTLP_ENDPOINT: http://jaeger:4317 + ports: + - "8080:8080" + - "1976:1976" + - "9090:9090" + volumes: + - ./service-account.json:/deployments/service-account.json + - ./application.properties:/deployments/config/application.properties diff --git a/run.sh b/run.sh new file mode 100755 index 00000000..24dd4a1c --- /dev/null +++ b/run.sh @@ -0,0 +1,54 @@ +#!/bin/bash +# +# In order to run Debezium for the Cassandra database, export the CASSANDRA_VERSION environment variable +# with one of the following values: v3, v4, dse. +# +# Copyright Debezium Authors. +# +# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 +# +LIB_PATH="lib/*" +# Configuration files and directories that need to be on the classpath +LIB_CONFIG_PATH="config/lib" + +if [ "$OSTYPE" = "msys" ] || [ "$OSTYPE" = "cygwin" ]; then + PATH_SEP=";" +else + PATH_SEP=":" +fi + +if [ -n "$EXTRA_CONNECTOR" ]; then + EXTRA_CONNECTOR=${EXTRA_CONNECTOR,,} + export EXTRA_CONNECTOR_DIR="connectors/debezium-connector-${EXTRA_CONNECTOR}" + + echo "Connector - ${EXTRA_CONNECTOR} loaded from ${EXTRA_CONNECTOR_DIR}" + + if [ -f "${EXTRA_CONNECTOR_DIR}/jdk_java_options.sh" ]; then + source "${EXTRA_CONNECTOR_DIR}/jdk_java_options.sh" + fi + + EXTRA_CLASS_PATH="" + if [ -f "${EXTRA_CONNECTOR_DIR}/extra_class_path.sh" ]; then + source "${EXTRA_CONNECTOR_DIR}/extra_class_path.sh" + LIB_PATH=$EXTRA_CLASS_PATH$LIB_PATH + fi +fi + +if [ -z "$JAVA_HOME" ]; then + JAVA_BINARY="java" +else + JAVA_BINARY="$JAVA_HOME/bin/java" +fi + +RUNNER=$(ls debezium-server-*runner.jar) + +ENABLE_DEBEZIUM_SCRIPTING=${ENABLE_DEBEZIUM_SCRIPTING:-false} +if [[ "${ENABLE_DEBEZIUM_SCRIPTING}" == "true" ]]; then + LIB_PATH=$LIB_PATH$PATH_SEP"lib_opt/*" +fi + +source ./jmx/enable_jmx.sh +source ./lib_metrics/enable_exporter.sh +echo "$JAVA_BINARY" $DEBEZIUM_OPTS $JAVA_OPTS -cp $RUNNER$PATH_SEP$LIB_CONFIG_PATH$PATH_SEP$LIB_PATH io.debezium.server.Main +exec "$JAVA_BINARY" $DEBEZIUM_OPTS $JAVA_OPTS -cp \ + $RUNNER$PATH_SEP$LIB_CONFIG_PATH$PATH_SEP$LIB_PATH io.debezium.server.Main \ No newline at end of file