From d6c6232a6754dd8f99dcc1e5e1505900e4b5a797 Mon Sep 17 00:00:00 2001 From: chakreshpatel Date: Fri, 22 Dec 2023 21:29:23 +0530 Subject: [PATCH 1/7] feat: Built Java KafkaToGCSDstream template for Dataproc --- java/pom.xml | 6 + .../dataproc/templates/BaseTemplate.java | 3 +- .../dataproc/templates/kafka/KafkaReader.java | 6 + .../templates/kafka/KafkaToGCSDstream.java | 197 ++++++++++++++++++ .../cloud/dataproc/templates/kafka/README.md | 74 +++++++ .../templates/main/DataProcTemplate.java | 2 + .../templates/util/TemplateConstants.java | 6 + java/src/main/resources/template.properties | 5 + .../kafka/KafkaTOGCSDstreamTest.java | 81 +++++++ 9 files changed, 379 insertions(+), 1 deletion(-) create mode 100644 java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java create mode 100644 java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java diff --git a/java/pom.xml b/java/pom.xml index 7230a04c0..3ca9ec726 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -321,6 +321,12 @@ google-cloud-bigtable 2.1.0 + + + org.apache.spark + spark-streaming-kafka-0-10_2.12 + ${spark.version} + diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/BaseTemplate.java b/java/src/main/java/com/google/cloud/dataproc/templates/BaseTemplate.java index 149ab3129..11c56b215 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/BaseTemplate.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/BaseTemplate.java @@ -57,7 +57,8 @@ enum TemplateName { GCSTOBIGTABLE, TEXTTOBIGQUERY, JDBCTOJDBC, - PUBSUBLITETOBIGTABLE + PUBSUBLITETOBIGTABLE, + KAFKATOGCSDSTREAM } default Properties getProperties() { diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaReader.java b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaReader.java index 0504e5190..abdebfe20 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaReader.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaReader.java @@ -62,7 +62,13 @@ public Dataset readKafkaTopic(SparkSession spark, Properties prop) { // check on memory constraints + return getDatasetByMessageFormat(inputData, prop); + } + + public Dataset getDatasetByMessageFormat(Dataset inputData, Properties prop) { Dataset processedData = null; + String kakfaMessageFormat = prop.getProperty(KAFKA_MESSAGE_FORMAT); + String kafkaSchemaUrl = prop.getProperty(KAFKA_SCHEMA_URL); switch (kakfaMessageFormat) { case "bytes": diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java new file mode 100644 index 000000000..1092850b3 --- /dev/null +++ b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java @@ -0,0 +1,197 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataproc.templates.kafka; + +import static com.google.cloud.dataproc.templates.util.TemplateConstants.*; + +import com.google.cloud.dataproc.templates.BaseTemplate; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.*; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Time; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka010.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +/** + * Spark job to move data or/and schema from Kafka topic to GCS via spark Direct Stream. This + * template can be configured to run in few different modes. In default mode kafka.gcs.write.mode is + * set to "append". For detailed list of properties refer "KafkaToGCSDstream Template properties" + * section in resources/template.properties file. + */ +public class KafkaToGCSDstream implements BaseTemplate { + + public static final Logger LOGGER = LoggerFactory.getLogger(KafkaToGCSDstream.class); + + private long batchInterval; + private String kafkaMessageFormat; + private String gcsOutputLocation; + private String gcsWriteMode; + private String gcsOutputFormat; + private String kafkaStartingOffsets; + private String kafkaGroupId; + private String kafkaBootstrapServers; + private String kafkaTopic; + private final String sparkLogLevel; + private final String kafkaSchemaUrl; + + public KafkaToGCSDstream() { + + kafkaBootstrapServers = getProperties().getProperty(KAFKA_BOOTSTRAP_SERVERS); + kafkaTopic = getProperties().getProperty(KAFKA_TOPIC); + kafkaMessageFormat = getProperties().getProperty(KAFKA_MESSAGE_FORMAT); + gcsOutputLocation = getProperties().getProperty(KAFKA_GCS_OUTPUT_LOCATION); + gcsOutputFormat = getProperties().getProperty(KAFKA_GCS_OUTPUT_FORMAT); + kafkaStartingOffsets = getProperties().getProperty(KAFKA_STARTING_OFFSET); + kafkaGroupId = getProperties().getProperty(KAFKA_CONSUMER_GROUP_ID); + batchInterval = Long.parseLong(getProperties().getProperty(KAFKA_BATCH_INTERVAL)); + gcsWriteMode = getProperties().getProperty(KAFKA_GCS_WRITE_MODE); + sparkLogLevel = getProperties().getProperty(SPARK_LOG_LEVEL); + kafkaSchemaUrl = getProperties().getProperty(KAFKA_SCHEMA_URL); + } + + @Override + public void runTemplate() throws TimeoutException, SQLException, InterruptedException { + + SparkSession spark = + SparkSession.builder().appName("Kafka to GCS via Direct stream").getOrCreate(); + + spark.sparkContext().setLogLevel(sparkLogLevel); + + Map kafkaParams = new HashMap<>(); + kafkaParams.put("bootstrap.servers", kafkaBootstrapServers); + kafkaParams.put("key.deserializer", StringDeserializer.class); + kafkaParams.put("value.deserializer", StringDeserializer.class); + kafkaParams.put("group.id", kafkaGroupId); + kafkaParams.put("auto.offset.reset", kafkaStartingOffsets); + kafkaParams.put("enable.auto.commit", false); + + Collection topics = Collections.singletonList(kafkaTopic); + + JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaStreamingContext ssc = new JavaStreamingContext(sparkContext, new Duration(batchInterval)); + KafkaReader reader = new KafkaReader(); + + JavaInputDStream> stream = + KafkaUtils.createDirectStream( + ssc, + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(topics, kafkaParams)); + + stream.foreachRDD( + (VoidFunction2>, Time>) + (rdd, time) -> { + LOGGER.debug("Reading kafka data"); + + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + JavaRDD> recordRdd = + rdd.map(record -> new Tuple2(record.key(), record.value())); + + Dataset rowDataset = + spark + .createDataset( + recordRdd.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .withColumnRenamed("_1", "key") + .withColumnRenamed("_2", "value"); + + Dataset processedData = + reader.getDatasetByMessageFormat(rowDataset, getProperties()); + + if (!processedData.isEmpty()) { + DataFrameWriter writer = + processedData.write().mode(gcsWriteMode).format(gcsOutputFormat); + + LOGGER.debug("Writing kafka data into GCS"); + writer.format(gcsOutputFormat).save(gcsOutputLocation); + } + ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); + }); + + ssc.start(); + ssc.awaitTermination(); + } + + @Override + public void validateInput() throws IllegalArgumentException { + if (StringUtils.isAllBlank(gcsOutputLocation) + || StringUtils.isAllBlank(gcsOutputFormat) + || StringUtils.isAllBlank(gcsWriteMode) + || StringUtils.isAllBlank(kafkaBootstrapServers) + || StringUtils.isAllBlank(kafkaTopic) + || StringUtils.isAllBlank(kafkaMessageFormat)) { + LOGGER.error( + "{},{},{},{},{},{} are required parameter. ", + KAFKA_GCS_OUTPUT_LOCATION, + KAFKA_GCS_OUTPUT_FORMAT, + KAFKA_GCS_WRITE_MODE, + KAFKA_BOOTSTRAP_SERVERS, + KAFKA_TOPIC, + KAFKA_MESSAGE_FORMAT); + throw new IllegalArgumentException( + "Required parameters for KafkaTOGCSDstream not passed. " + + "Set mandatory parameter for KafkaTOGCSDstream template " + + "in resources/conf/template.properties file."); + } + + if (kafkaMessageFormat.equals("json") & StringUtils.isAllBlank(kafkaSchemaUrl)) { + LOGGER.error("{} is a required parameter for JSON format messages", KAFKA_SCHEMA_URL); + throw new IllegalArgumentException("Required parameters for KafkaTOGCSDstream not passed."); + } + + LOGGER.info( + "Starting GCS to GCS spark job with following parameters:" + + "1. {}:{} " + + "2. {}:{} " + + "3. {}:{} " + + "4. {}:{} " + + "5. {}:{} " + + "6. {}:{} " + + "7. {}:{} " + + "8. {}:{} ", + KAFKA_MESSAGE_FORMAT, + kafkaMessageFormat, + KAFKA_GCS_OUTPUT_LOCATION, + gcsOutputLocation, + KAFKA_GCS_OUTPUT_FORMAT, + gcsOutputFormat, + KAFKA_GCS_WRITE_MODE, + gcsWriteMode, + KAFKA_BOOTSTRAP_SERVERS, + kafkaBootstrapServers, + KAFKA_TOPIC, + kafkaTopic, + KAFKA_BATCH_INTERVAL, + batchInterval, + KAFKA_SCHEMA_URL, + kafkaSchemaUrl); + } +} diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md index 0120188e7..73dcde91a 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md +++ b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md @@ -237,4 +237,78 @@ bin/start.sh \ --templateProperty kafka.pubsub.output.projectId=$GCP_PROJECT \ --templateProperty kafka.pubsub.starting.offset=earliest \ --templateProperty kafka.pubsub.await.termination.timeout=120000 +``` + + +## 4. Kafka To GCS via spark Direct stream + +General Execution: + +``` +GCP_PROJECT= \ +REGION= \ +SUBNET= \ +GCS_STAGING_LOCATION= \ +HISTORY_SERVER_CLUSTER= \ +bin/start.sh \ +-- --template KafkaTOGCSDstream \ +--templateProperty project.id= \ +--templateProperty kafka.gcs.output.location= \ +--templateProperty kafka.bootstrap.servers= \ +--templateProperty kafka.topic= \ +--templateProperty kafka.starting.offset= \ +--templateProperty kafka.message.format= +--templateProperty kafka.gcs.write.mode= \ +--templateProperty kafka.gcs.batch.interval= \ +--templateProperty kafka.gcs.consumer.group.id= \ +--templateProperty kafka.gcs.output.format= \ +--templateProperty kafka.schema.url= +``` + +### Configurable Parameters +Following properties are avaialble in commandline or [template.properties](../../../../../../../resources/template.properties) file: + +``` +# Kafka to GCS via Dstream + +# Kafka servers +kafka.bootstrap.servers= + +# Kafka topics +kafka.topic= + +# Offset to start reading from. Accepted values: "earliest", "latest" +kafka.starting.offset= + +# Time in seconds how long data will be collected before dispatching processing on it. +kafka.gcs.batch.interval== + +# Ouptut mode for writing data. Accepted values: 'overwrite', 'append', 'ignore', 'error', 'errorifexists', 'default' +kafka.gcs.write.mode= default value : append + +# Schema url is required if 'kafka.message.format' is set to json +kafka.schema.url== +``` + + +### Example submission +``` +export GCP_PROJECT=dp-test-project +export REGION=us-central1 +export SUBNET=test-subnet +export GCS_STAGING_LOCATION=gs://dp-templates-kakfatogcs/stg +export GCS_OUTPUT_PATH=gs://dp-templates-kafkatogcs/output/ +bin/start.sh \ +-- --template KafkaTOGCSDstream \ +--templateProperty project.id=$GCP_PROJECT \ +--templateProperty kafka.bootstrap.servers=102.1.1.20:9092 \ +--templateProperty kafka.topic=events-topic \ +--templateProperty kafka.starting.offset=latest \ +--templateProperty kafka.message.format=bytes \ +--templateProperty kafka.gcs.output.location=$GCS_OUTPUT_PATH \ +--templateProperty kafka.gcs.output.format=parquet \ +--templateProperty kafka.gcs.write.mode=Append \ +--templateProperty kafka.gcs.batch.interval=60000 \ +--templateProperty kafka.gcs.consumer.group.id=test.group.id \ +--templateProperty kafka.schema.url=gs://dp-templates-kafkatogcs/schema/msg_schema.json ``` \ No newline at end of file diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/main/DataProcTemplate.java b/java/src/main/java/com/google/cloud/dataproc/templates/main/DataProcTemplate.java index 74c1f8fe2..087e0ff5f 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/main/DataProcTemplate.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/main/DataProcTemplate.java @@ -35,6 +35,7 @@ import com.google.cloud.dataproc.templates.jdbc.JDBCToSpanner; import com.google.cloud.dataproc.templates.kafka.KafkaToBQ; import com.google.cloud.dataproc.templates.kafka.KafkaToGCS; +import com.google.cloud.dataproc.templates.kafka.KafkaToGCSDstream; import com.google.cloud.dataproc.templates.kafka.KafkaToPubSub; import com.google.cloud.dataproc.templates.pubsub.PubSubToBQ; import com.google.cloud.dataproc.templates.pubsub.PubSubToBigTable; @@ -101,6 +102,7 @@ public class DataProcTemplate { .put(TemplateName.SNOWFLAKETOGCS, SnowflakeToGCS::of) .put(TemplateName.JDBCTOJDBC, JDBCToJDBC::of) .put(TemplateName.TEXTTOBIGQUERY, (args) -> new TextToBigquery()) + .put(TemplateName.KAFKATOGCSDSTREAM, (args) -> new KafkaToGCSDstream()) .build(); private static final String TEMPLATE_NAME_LONG_OPT = "template"; private static final String TEMPLATE_PROPERTY_LONG_OPT = "templateProperty"; diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java b/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java index 3649259c4..55169f546 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java @@ -498,4 +498,10 @@ public interface TemplateConstants { String TEXT_BIGQUERY_DELIMITER_OPTION = "delimiter"; String TEXT_BIGQUERY_INFERSCHEMA_OPTION = "inferSchema"; String TEXT_BIGQUERY_HEADER_OPTION = "header"; + + /** Kafka To GCS - Direct stream properties */ + String KAFKA_CONSUMER_GROUP_ID = "kafka.gcs.consumer.group.id"; + + String KAFKA_BATCH_INTERVAL = "kafka.gcs.batch.interval"; + String KAFKA_GCS_WRITE_MODE = "kafka.gcs.write.mode"; } diff --git a/java/src/main/resources/template.properties b/java/src/main/resources/template.properties index e1a125745..48cb45dbc 100644 --- a/java/src/main/resources/template.properties +++ b/java/src/main/resources/template.properties @@ -457,3 +457,8 @@ text.bigquery.output.dataset= text.bigquery.output.table= text.bigquery.output.mode= text.bigquery.temp.bucket= + +# Kafka TO GCS Dstream +kafka.gcs.consumer.group.id=group-id1 +kafka.gcs.batch.interval=60000 +kafka.gcs.write.mode=Append \ No newline at end of file diff --git a/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java b/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java new file mode 100644 index 000000000..9cd6ebff9 --- /dev/null +++ b/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataproc.templates.kafka; + +import static com.google.cloud.dataproc.templates.util.TemplateConstants.*; +import static com.google.cloud.dataproc.templates.util.TemplateConstants.KAFKA_SCHEMA_URL; +import static org.junit.jupiter.api.Assertions.*; + +import com.google.cloud.dataproc.templates.util.PropertyUtil; +import java.util.stream.Stream; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaTOGCSDstreamTest { + private KafkaToGCSDstream kafkaTOGCSDstream; + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTOGCSDstreamTest.class); + + @BeforeEach + void setup() { + SparkSession spark = SparkSession.builder().master("local").getOrCreate(); + } + + @ParameterizedTest + @MethodSource("propertyKeys") + void runTemplateWithValidParameters(String propKey) { + PropertyUtil.getProperties().setProperty(propKey, "some value"); + PropertyUtil.getProperties().setProperty(KAFKA_GCS_OUTPUT_LOCATION, "gs://test-bucket"); + PropertyUtil.getProperties().setProperty(KAFKA_GCS_OUTPUT_FORMAT, "bytes"); + PropertyUtil.getProperties().setProperty(KAFKA_BOOTSTRAP_SERVERS, "value"); + PropertyUtil.getProperties().setProperty(KAFKA_TOPIC, "value"); + PropertyUtil.getProperties().setProperty(KAFKA_MESSAGE_FORMAT, "csv"); + PropertyUtil.getProperties().setProperty(KAFKA_SCHEMA_URL, ""); + PropertyUtil.getProperties().setProperty(KAFKA_GCS_WRITE_MODE, "append"); + PropertyUtil.getProperties().setProperty(KAFKA_MESSAGE_FORMAT, "bytes"); + + kafkaTOGCSDstream = new KafkaToGCSDstream(); + assertDoesNotThrow(kafkaTOGCSDstream::validateInput); + } + + @ParameterizedTest + @MethodSource("propertyKeys") + void runTemplateWithInvalidParameters(String propKey) { + PropertyUtil.getProperties().setProperty(propKey, ""); + kafkaTOGCSDstream = new KafkaToGCSDstream(); + + Exception exception = + assertThrows(IllegalArgumentException.class, () -> kafkaTOGCSDstream.validateInput()); + assertEquals( + "Required parameters for KafkaTOGCSDstream not passed. " + + "Set mandatory parameter for KafkaTOGCSDstream template " + + "in resources/conf/template.properties file.", + exception.getMessage()); + } + + static Stream propertyKeys() { + return Stream.of( + KAFKA_GCS_OUTPUT_LOCATION, + KAFKA_GCS_OUTPUT_FORMAT, + KAFKA_BOOTSTRAP_SERVERS, + KAFKA_TOPIC, + KAFKA_MESSAGE_FORMAT, + KAFKA_GCS_WRITE_MODE); + } +} From b6d5bf1997ca9ba269011623315d05c3c5d5cf85 Mon Sep 17 00:00:00 2001 From: chakreshpatel Date: Sun, 31 Dec 2023 18:43:30 +0530 Subject: [PATCH 2/7] refactor: Added validations for the template --- java/pom.xml | 2 +- .../templates/kafka/KafkaToGCSDstream.java | 17 +++++++++++------ .../templates/kafka/KafkaTOGCSDstreamTest.java | 1 + 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index 3ca9ec726..7b3397bcc 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -324,7 +324,7 @@ org.apache.spark - spark-streaming-kafka-0-10_2.12 + spark-streaming-kafka-0-10_${scala.binary.version} ${spark.version} diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java index 1092850b3..d40ed3a81 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java @@ -147,15 +147,17 @@ public void validateInput() throws IllegalArgumentException { || StringUtils.isAllBlank(gcsWriteMode) || StringUtils.isAllBlank(kafkaBootstrapServers) || StringUtils.isAllBlank(kafkaTopic) - || StringUtils.isAllBlank(kafkaMessageFormat)) { + || StringUtils.isAllBlank(kafkaMessageFormat) + || StringUtils.isAllBlank(kafkaGroupId)) { LOGGER.error( - "{},{},{},{},{},{} are required parameter. ", + "{},{},{},{},{},{},{} are required parameter. ", KAFKA_GCS_OUTPUT_LOCATION, KAFKA_GCS_OUTPUT_FORMAT, KAFKA_GCS_WRITE_MODE, KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, - KAFKA_MESSAGE_FORMAT); + KAFKA_MESSAGE_FORMAT, + KAFKA_CONSUMER_GROUP_ID); throw new IllegalArgumentException( "Required parameters for KafkaTOGCSDstream not passed. " + "Set mandatory parameter for KafkaTOGCSDstream template " @@ -168,7 +170,7 @@ public void validateInput() throws IllegalArgumentException { } LOGGER.info( - "Starting GCS to GCS spark job with following parameters:" + "Starting kafka to GCS via DStream spark job with following parameters:" + "1. {}:{} " + "2. {}:{} " + "3. {}:{} " @@ -176,7 +178,8 @@ public void validateInput() throws IllegalArgumentException { + "5. {}:{} " + "6. {}:{} " + "7. {}:{} " - + "8. {}:{} ", + + "8. {}:{} " + + "9. {}:{} ", KAFKA_MESSAGE_FORMAT, kafkaMessageFormat, KAFKA_GCS_OUTPUT_LOCATION, @@ -192,6 +195,8 @@ public void validateInput() throws IllegalArgumentException { KAFKA_BATCH_INTERVAL, batchInterval, KAFKA_SCHEMA_URL, - kafkaSchemaUrl); + kafkaSchemaUrl, + KAFKA_CONSUMER_GROUP_ID, + kafkaGroupId); } } diff --git a/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java b/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java index 9cd6ebff9..54dbb299a 100644 --- a/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java +++ b/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java @@ -49,6 +49,7 @@ void runTemplateWithValidParameters(String propKey) { PropertyUtil.getProperties().setProperty(KAFKA_SCHEMA_URL, ""); PropertyUtil.getProperties().setProperty(KAFKA_GCS_WRITE_MODE, "append"); PropertyUtil.getProperties().setProperty(KAFKA_MESSAGE_FORMAT, "bytes"); + PropertyUtil.getProperties().setProperty(KAFKA_CONSUMER_GROUP_ID, "123"); kafkaTOGCSDstream = new KafkaToGCSDstream(); assertDoesNotThrow(kafkaTOGCSDstream::validateInput); From 8def950cab65d490c6ef79061be96b24f0c8da70 Mon Sep 17 00:00:00 2001 From: chakreshpatel Date: Tue, 2 Jan 2024 20:28:07 +0530 Subject: [PATCH 3/7] docs: added links in README for KafkaToGCSDstream template --- README.md | 1 + java/README.md | 1 + .../google/cloud/dataproc/templates/util/TemplateConstants.java | 1 + 3 files changed, 3 insertions(+) diff --git a/README.md b/README.md index fd3935545..a9a149cbf 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more i * [TextToBigquery](/java/src/main/java/com/google/cloud/dataproc/templates/gcs#7-text-to-bigquery) * [WordCount](/java/src/main/java/com/google/cloud/dataproc/templates/word/WordCount.java) * [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream) +* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream) ## Dataproc Templates (Python - PySpark) diff --git a/java/README.md b/java/README.md index a8101c93e..e6f5aebac 100644 --- a/java/README.md +++ b/java/README.md @@ -38,6 +38,7 @@ Please refer to the [Dataproc Templates (Java - Spark) README](java/README.md) f * [TextToBigquery](/java/src/main/java/com/google/cloud/dataproc/templates/gcs#7-text-to-bigquery) * [WordCount](/java/src/main/java/com/google/cloud/dataproc/templates/word/WordCount.java) * [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream) +* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream) ... diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java b/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java index 0f3b62178..409b1f320 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java @@ -506,6 +506,7 @@ public interface TemplateConstants { /** Kafka To GCS - Direct stream properties */ String KAFKA_CONSUMER_GROUP_ID = "kafka.gcs.consumer.group.id"; + String KAFKA_BATCH_INTERVAL = "kafka.gcs.batch.interval"; String KAFKA_GCS_WRITE_MODE = "kafka.gcs.write.mode"; } From b877cbd71d4bc806a2eeb4e79f18f9146f8ee477 Mon Sep 17 00:00:00 2001 From: chakreshpatel Date: Tue, 2 Jan 2024 21:13:05 +0530 Subject: [PATCH 4/7] ci: added stage for java Jenkinsfile --- java/.ci/Jenkinsfile | 24 +++++++++++++++++++++ java/src/main/resources/template.properties | 1 - 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/java/.ci/Jenkinsfile b/java/.ci/Jenkinsfile index 661e1e757..ed6936969 100644 --- a/java/.ci/Jenkinsfile +++ b/java/.ci/Jenkinsfile @@ -919,6 +919,30 @@ pipeline { } } } + stage('KAFKA TO GCS via Dstream (bytes)'){ + steps { + retry(count: stageRetryCount) { + sh ''' + + export SKIP_BUILD=true + cd java + bin/start.sh \ + -- \ + --template KafkaToGCSDstream \ + --templateProperty project.id=$GCP_PROJECT \ + --templateProperty kafka.bootstrap.servers=$ENV_TEST_KAFKA_BOOTSTRAP_SERVERS \ + --templateProperty kafka.topic=integration-test \ + --templateProperty kafka.starting.offset=latest \ + --templateProperty kafka.gcs.output.location=gs://dataproc-templates/integration-testing/output/KAFKATOGCS/bytes \ + --templateProperty kafka.gcs.batch.interval=60000 \ + --templateProperty kafka.gcs.consumer.group.id=testgroupgcs \ + --templateProperty kafka.gcs.write.mode=append \ + --templateProperty kafka.message.format=bytes \ + --templateProperty kafka.gcs.output.format=parquet + ''' + } + } + } } } } diff --git a/java/src/main/resources/template.properties b/java/src/main/resources/template.properties index a7f0d7411..a9029c660 100644 --- a/java/src/main/resources/template.properties +++ b/java/src/main/resources/template.properties @@ -461,6 +461,5 @@ text.bigquery.temp.bucket= # Kafka TO BQ Dstream kafka.bq.batch.interval=60000 # Kafka TO GCS Dstream -kafka.gcs.consumer.group.id=group-id1 kafka.gcs.batch.interval=60000 kafka.gcs.write.mode=Append \ No newline at end of file From b97793a42e497cf531fd9a94c89fb7deca54e7b0 Mon Sep 17 00:00:00 2001 From: chakreshpatel Date: Wed, 3 Jan 2024 12:51:51 +0530 Subject: [PATCH 5/7] ci: fixed int test java Jenkinsfile --- java/.ci/Jenkinsfile | 3 ++- .../dataproc/templates/kafka/KafkaToGCSDstream.java | 12 +++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/java/.ci/Jenkinsfile b/java/.ci/Jenkinsfile index ed6936969..a75966d9a 100644 --- a/java/.ci/Jenkinsfile +++ b/java/.ci/Jenkinsfile @@ -938,7 +938,8 @@ pipeline { --templateProperty kafka.gcs.consumer.group.id=testgroupgcs \ --templateProperty kafka.gcs.write.mode=append \ --templateProperty kafka.message.format=bytes \ - --templateProperty kafka.gcs.output.format=parquet + --templateProperty kafka.gcs.output.format=parquet \ + --templateProperty kafka.gcs.await.termination.timeout.ms=60000 ''' } } diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java index d40ed3a81..20904231a 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java @@ -61,6 +61,7 @@ public class KafkaToGCSDstream implements BaseTemplate { private String kafkaTopic; private final String sparkLogLevel; private final String kafkaSchemaUrl; + private Long kafkaAwaitTerminationTimeout; public KafkaToGCSDstream() { @@ -75,6 +76,8 @@ public KafkaToGCSDstream() { gcsWriteMode = getProperties().getProperty(KAFKA_GCS_WRITE_MODE); sparkLogLevel = getProperties().getProperty(SPARK_LOG_LEVEL); kafkaSchemaUrl = getProperties().getProperty(KAFKA_SCHEMA_URL); + kafkaAwaitTerminationTimeout = + Long.valueOf(getProperties().getProperty(KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT)); } @Override @@ -137,7 +140,7 @@ public void runTemplate() throws TimeoutException, SQLException, InterruptedExce }); ssc.start(); - ssc.awaitTermination(); + ssc.awaitTerminationOrTimeout(kafkaAwaitTerminationTimeout); } @Override @@ -179,7 +182,8 @@ public void validateInput() throws IllegalArgumentException { + "6. {}:{} " + "7. {}:{} " + "8. {}:{} " - + "9. {}:{} ", + + "9. {}:{} " + + "10. {}:{} ", KAFKA_MESSAGE_FORMAT, kafkaMessageFormat, KAFKA_GCS_OUTPUT_LOCATION, @@ -197,6 +201,8 @@ public void validateInput() throws IllegalArgumentException { KAFKA_SCHEMA_URL, kafkaSchemaUrl, KAFKA_CONSUMER_GROUP_ID, - kafkaGroupId); + kafkaGroupId, + KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT, + kafkaAwaitTerminationTimeout); } } From 896bf4b182c773a344a4322c9cfbeb24ec7fb796 Mon Sep 17 00:00:00 2001 From: chakreshpatel Date: Thu, 4 Jan 2024 18:21:14 +0530 Subject: [PATCH 6/7] refactor: added review input for the KafkaToGCSDstream template --- README.md | 4 +-- java/.ci/Jenkinsfile | 29 ++++++++++++++++++- java/README.md | 2 ++ java/pom.xml | 7 ----- .../templates/kafka/KafkaToGCSDstream.java | 12 ++++---- .../templates/util/TemplateConstants.java | 4 +-- ...amTest.java => KafkaToGCSDstreamTest.java} | 6 ++-- 7 files changed, 43 insertions(+), 21 deletions(-) rename java/src/test/java/com/google/cloud/dataproc/templates/kafka/{KafkaTOGCSDstreamTest.java => KafkaToGCSDstreamTest.java} (95%) diff --git a/README.md b/README.md index a9a149cbf..c02e82e38 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,9 @@ Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more i * [JDBCToJDBC](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#3-jdbc-to-jdbc) * [JDBCToSpanner](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#4-jdbc-to-spanner) * [KafkaToBQ](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#1-kafka-to-bigquery) (blogpost [link](https://medium.com/google-cloud/export-data-from-apache-kafka-to-bigquery-using-dataproc-serverless-4a666535117c)) +* [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream) * [KafkaToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md#2-kafka-to-gcs) (blogpost [link](https://medium.com/@pniralakeri/importing-data-from-kafka-to-gcs-using-dataproc-serverless-38e449d559f9)) +* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream) * [KafkaToPubSub](/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md#3-kafka-to-pubsub) * [MongoToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-mongo-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/migrating-data-from-mongo-to-gcs-using-java-and-dataproc-serverless-template-390500481804)) * [PubSubToBigQuery](/java/src/main/java/com/google/cloud/dataproc/templates/pubsub#1-pubsub-to-bigquery) (blogpost [link](https://medium.com/google-cloud/from-pub-sub-to-bigquery-streaming-data-in-near-real-time-b550aeff595d)) @@ -50,8 +52,6 @@ Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more i * [SpannerToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-spanner-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/cloud-spanner-export-query-results-using-dataproc-serverless-6f2f65b583a4)) * [TextToBigquery](/java/src/main/java/com/google/cloud/dataproc/templates/gcs#7-text-to-bigquery) * [WordCount](/java/src/main/java/com/google/cloud/dataproc/templates/word/WordCount.java) -* [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream) -* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream) ## Dataproc Templates (Python - PySpark) diff --git a/java/.ci/Jenkinsfile b/java/.ci/Jenkinsfile index a75966d9a..0efee41fa 100644 --- a/java/.ci/Jenkinsfile +++ b/java/.ci/Jenkinsfile @@ -45,6 +45,7 @@ pipeline { gsutil rm -r gs://dataproc-templates/integration-testing/checkpoint/KAFKATOBQ 2> /dev/null || true gsutil rm -r gs://dataproc-templates/integration-testing/output/KAFKATOGCS 2> /dev/null || true + gsutil rm -r gs://dataproc-templates/integration-testing/output/KAFKATOGCS_DStream 2> /dev/null || true ''' } @@ -933,7 +934,7 @@ pipeline { --templateProperty kafka.bootstrap.servers=$ENV_TEST_KAFKA_BOOTSTRAP_SERVERS \ --templateProperty kafka.topic=integration-test \ --templateProperty kafka.starting.offset=latest \ - --templateProperty kafka.gcs.output.location=gs://dataproc-templates/integration-testing/output/KAFKATOGCS/bytes \ + --templateProperty kafka.gcs.output.location=gs://dataproc-templates/integration-testing/output/KAFKATOGCS_DStream/bytes \ --templateProperty kafka.gcs.batch.interval=60000 \ --templateProperty kafka.gcs.consumer.group.id=testgroupgcs \ --templateProperty kafka.gcs.write.mode=append \ @@ -944,6 +945,32 @@ pipeline { } } } + stage('KAFKA TO GCS via Dstream (json)'){ + steps { + retry(count: stageRetryCount) { + sh ''' + + export SKIP_BUILD=true + cd java + bin/start.sh \ + -- \ + --template KafkaToGCSDstream \ + --templateProperty project.id=$GCP_PROJECT \ + --templateProperty kafka.bootstrap.servers=$ENV_TEST_KAFKA_BOOTSTRAP_SERVERS \ + --templateProperty kafka.topic=integration-test-json \ + --templateProperty kafka.starting.offset=latest \ + --templateProperty kafka.gcs.output.location=gs://dataproc-templates/integration-testing/output/KAFKATOGCS_DStream/json \ + --templateProperty kafka.gcs.batch.interval=60000 \ + --templateProperty kafka.gcs.consumer.group.id=testgroupjson \ + --templateProperty kafka.gcs.write.mode=append \ + --templateProperty kafka.message.format=json \ + --templateProperty kafka.schema.url=gs://dataproc-templates/integration-testing/schema.json \ + --templateProperty kafka.gcs.output.format=parquet \ + --templateProperty kafka.gcs.await.termination.timeout.ms=60000 + ''' + } + } + } } } diff --git a/java/README.md b/java/README.md index e6f5aebac..40ac71cff 100644 --- a/java/README.md +++ b/java/README.md @@ -24,7 +24,9 @@ Please refer to the [Dataproc Templates (Java - Spark) README](java/README.md) f * [JDBCToJDBC](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#3-jdbc-to-jdbc) * [JDBCToSpanner](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#4-jdbc-to-spanner) * [KafkaToBQ](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#1-kafka-to-bigquery) (blogpost [link](https://medium.com/google-cloud/export-data-from-apache-kafka-to-bigquery-using-dataproc-serverless-4a666535117c)) +* [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream) * [KafkaToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#2-kafka-to-gcs) (blogpost [link](https://medium.com/@pniralakeri/importing-data-from-kafka-to-gcs-using-dataproc-serverless-38e449d559f9)) +* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream) * [KafkaToPubSub](/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md#3-kafka-to-pubsub) * [MongoToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-mongo-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/migrating-data-from-mongo-to-gcs-using-java-and-dataproc-serverless-template-390500481804)) * [PubSubToBigQuery](/java/src/main/java/com/google/cloud/dataproc/templates/pubsub#1-pubsub-to-bigquery) (blogpost [link](https://medium.com/google-cloud/from-pub-sub-to-bigquery-streaming-data-in-near-real-time-b550aeff595d)) diff --git a/java/pom.xml b/java/pom.xml index 88af1aa8c..75168d24e 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -327,13 +327,6 @@ spark-streaming-kafka-0-10_${scala.binary.version} ${spark.version} - - - - org.apache.spark - spark-streaming-kafka-0-10_${scala.binary.version} - ${spark.version} - diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java index 20904231a..f597f2d0e 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java @@ -71,8 +71,8 @@ public KafkaToGCSDstream() { gcsOutputLocation = getProperties().getProperty(KAFKA_GCS_OUTPUT_LOCATION); gcsOutputFormat = getProperties().getProperty(KAFKA_GCS_OUTPUT_FORMAT); kafkaStartingOffsets = getProperties().getProperty(KAFKA_STARTING_OFFSET); - kafkaGroupId = getProperties().getProperty(KAFKA_CONSUMER_GROUP_ID); - batchInterval = Long.parseLong(getProperties().getProperty(KAFKA_BATCH_INTERVAL)); + kafkaGroupId = getProperties().getProperty(KAFKA_GCS_CONSUMER_GROUP_ID); + batchInterval = Long.parseLong(getProperties().getProperty(KAFKA_GCS_BATCH_INTERVAL)); gcsWriteMode = getProperties().getProperty(KAFKA_GCS_WRITE_MODE); sparkLogLevel = getProperties().getProperty(SPARK_LOG_LEVEL); kafkaSchemaUrl = getProperties().getProperty(KAFKA_SCHEMA_URL); @@ -160,7 +160,7 @@ public void validateInput() throws IllegalArgumentException { KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, KAFKA_MESSAGE_FORMAT, - KAFKA_CONSUMER_GROUP_ID); + KAFKA_GCS_CONSUMER_GROUP_ID); throw new IllegalArgumentException( "Required parameters for KafkaTOGCSDstream not passed. " + "Set mandatory parameter for KafkaTOGCSDstream template " @@ -169,7 +169,7 @@ public void validateInput() throws IllegalArgumentException { if (kafkaMessageFormat.equals("json") & StringUtils.isAllBlank(kafkaSchemaUrl)) { LOGGER.error("{} is a required parameter for JSON format messages", KAFKA_SCHEMA_URL); - throw new IllegalArgumentException("Required parameters for KafkaTOGCSDstream not passed."); + throw new IllegalArgumentException("Required parameters for KafkaToGCSDstream not passed."); } LOGGER.info( @@ -196,11 +196,11 @@ public void validateInput() throws IllegalArgumentException { kafkaBootstrapServers, KAFKA_TOPIC, kafkaTopic, - KAFKA_BATCH_INTERVAL, + KAFKA_GCS_BATCH_INTERVAL, batchInterval, KAFKA_SCHEMA_URL, kafkaSchemaUrl, - KAFKA_CONSUMER_GROUP_ID, + KAFKA_GCS_CONSUMER_GROUP_ID, kafkaGroupId, KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT, kafkaAwaitTerminationTimeout); diff --git a/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java b/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java index 409b1f320..2631a6926 100644 --- a/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java +++ b/java/src/main/java/com/google/cloud/dataproc/templates/util/TemplateConstants.java @@ -505,8 +505,8 @@ public interface TemplateConstants { String KAFKA_BQ_BATCH_INTERVAL = "kafka.bq.batch.interval"; /** Kafka To GCS - Direct stream properties */ - String KAFKA_CONSUMER_GROUP_ID = "kafka.gcs.consumer.group.id"; + String KAFKA_GCS_CONSUMER_GROUP_ID = "kafka.gcs.consumer.group.id"; - String KAFKA_BATCH_INTERVAL = "kafka.gcs.batch.interval"; + String KAFKA_GCS_BATCH_INTERVAL = "kafka.gcs.batch.interval"; String KAFKA_GCS_WRITE_MODE = "kafka.gcs.write.mode"; } diff --git a/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java b/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstreamTest.java similarity index 95% rename from java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java rename to java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstreamTest.java index 54dbb299a..95b25e5f0 100644 --- a/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaTOGCSDstreamTest.java +++ b/java/src/test/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstreamTest.java @@ -28,9 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaTOGCSDstreamTest { +public class KafkaToGCSDstreamTest { private KafkaToGCSDstream kafkaTOGCSDstream; - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTOGCSDstreamTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaToGCSDstreamTest.class); @BeforeEach void setup() { @@ -49,7 +49,7 @@ void runTemplateWithValidParameters(String propKey) { PropertyUtil.getProperties().setProperty(KAFKA_SCHEMA_URL, ""); PropertyUtil.getProperties().setProperty(KAFKA_GCS_WRITE_MODE, "append"); PropertyUtil.getProperties().setProperty(KAFKA_MESSAGE_FORMAT, "bytes"); - PropertyUtil.getProperties().setProperty(KAFKA_CONSUMER_GROUP_ID, "123"); + PropertyUtil.getProperties().setProperty(KAFKA_GCS_CONSUMER_GROUP_ID, "123"); kafkaTOGCSDstream = new KafkaToGCSDstream(); assertDoesNotThrow(kafkaTOGCSDstream::validateInput); From 34bc48af181aa0d7d7a62a7854f0ad7205c709a1 Mon Sep 17 00:00:00 2001 From: chakreshpatel Date: Thu, 4 Jan 2024 18:27:53 +0530 Subject: [PATCH 7/7] ci: fixed int test java Jenkinsfile --- java/.ci/Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/.ci/Jenkinsfile b/java/.ci/Jenkinsfile index 0efee41fa..0e4fc1e07 100644 --- a/java/.ci/Jenkinsfile +++ b/java/.ci/Jenkinsfile @@ -970,7 +970,7 @@ pipeline { ''' } } - + } } } }