Skip to content

Commit

Permalink
Merge pull request #899 from GoogleCloudPlatform/kafka-gcs-dstream
Browse files Browse the repository at this point in the history
feat: Built Java KafkaToGCSDstream template for Dataproc
  • Loading branch information
vanshaj-bhatia authored Jan 8, 2024
2 parents 855635d + 34bc48a commit 7bc3e09
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 4 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more i
* [JDBCToJDBC](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#3-jdbc-to-jdbc)
* [JDBCToSpanner](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#4-jdbc-to-spanner)
* [KafkaToBQ](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#1-kafka-to-bigquery) (blogpost [link](https://medium.com/google-cloud/export-data-from-apache-kafka-to-bigquery-using-dataproc-serverless-4a666535117c))
* [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream)
* [KafkaToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md#2-kafka-to-gcs) (blogpost [link](https://medium.com/@pniralakeri/importing-data-from-kafka-to-gcs-using-dataproc-serverless-38e449d559f9))
* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream)
* [KafkaToPubSub](/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md#3-kafka-to-pubsub)
* [MongoToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-mongo-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/migrating-data-from-mongo-to-gcs-using-java-and-dataproc-serverless-template-390500481804))
* [PubSubToBigQuery](/java/src/main/java/com/google/cloud/dataproc/templates/pubsub#1-pubsub-to-bigquery) (blogpost [link](https://medium.com/google-cloud/from-pub-sub-to-bigquery-streaming-data-in-near-real-time-b550aeff595d))
Expand All @@ -50,7 +52,6 @@ Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more i
* [SpannerToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-spanner-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/cloud-spanner-export-query-results-using-dataproc-serverless-6f2f65b583a4))
* [TextToBigquery](/java/src/main/java/com/google/cloud/dataproc/templates/gcs#7-text-to-bigquery)
* [WordCount](/java/src/main/java/com/google/cloud/dataproc/templates/word/WordCount.java)
* [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream)


## Dataproc Templates (Python - PySpark)
Expand Down
52 changes: 52 additions & 0 deletions java/.ci/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pipeline {
gsutil rm -r gs://dataproc-templates/integration-testing/checkpoint/KAFKATOBQ 2> /dev/null || true
gsutil rm -r gs://dataproc-templates/integration-testing/output/KAFKATOGCS 2> /dev/null || true
gsutil rm -r gs://dataproc-templates/integration-testing/output/KAFKATOGCS_DStream 2> /dev/null || true
'''
}
Expand Down Expand Up @@ -919,6 +920,57 @@ pipeline {
}
}
}
stage('KAFKA TO GCS via Dstream (bytes)'){
steps {
retry(count: stageRetryCount) {
sh '''
export SKIP_BUILD=true
cd java
bin/start.sh \
-- \
--template KafkaToGCSDstream \
--templateProperty project.id=$GCP_PROJECT \
--templateProperty kafka.bootstrap.servers=$ENV_TEST_KAFKA_BOOTSTRAP_SERVERS \
--templateProperty kafka.topic=integration-test \
--templateProperty kafka.starting.offset=latest \
--templateProperty kafka.gcs.output.location=gs://dataproc-templates/integration-testing/output/KAFKATOGCS_DStream/bytes \
--templateProperty kafka.gcs.batch.interval=60000 \
--templateProperty kafka.gcs.consumer.group.id=testgroupgcs \
--templateProperty kafka.gcs.write.mode=append \
--templateProperty kafka.message.format=bytes \
--templateProperty kafka.gcs.output.format=parquet \
--templateProperty kafka.gcs.await.termination.timeout.ms=60000
'''
}
}
}
stage('KAFKA TO GCS via Dstream (json)'){
steps {
retry(count: stageRetryCount) {
sh '''
export SKIP_BUILD=true
cd java
bin/start.sh \
-- \
--template KafkaToGCSDstream \
--templateProperty project.id=$GCP_PROJECT \
--templateProperty kafka.bootstrap.servers=$ENV_TEST_KAFKA_BOOTSTRAP_SERVERS \
--templateProperty kafka.topic=integration-test-json \
--templateProperty kafka.starting.offset=latest \
--templateProperty kafka.gcs.output.location=gs://dataproc-templates/integration-testing/output/KAFKATOGCS_DStream/json \
--templateProperty kafka.gcs.batch.interval=60000 \
--templateProperty kafka.gcs.consumer.group.id=testgroupjson \
--templateProperty kafka.gcs.write.mode=append \
--templateProperty kafka.message.format=json \
--templateProperty kafka.schema.url=gs://dataproc-templates/integration-testing/schema.json \
--templateProperty kafka.gcs.output.format=parquet \
--templateProperty kafka.gcs.await.termination.timeout.ms=60000
'''
}
}
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ Please refer to the [Dataproc Templates (Java - Spark) README](java/README.md) f
* [JDBCToJDBC](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#3-jdbc-to-jdbc)
* [JDBCToSpanner](/java/src/main/java/com/google/cloud/dataproc/templates/jdbc#4-jdbc-to-spanner)
* [KafkaToBQ](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#1-kafka-to-bigquery) (blogpost [link](https://medium.com/google-cloud/export-data-from-apache-kafka-to-bigquery-using-dataproc-serverless-4a666535117c))
* [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream)
* [KafkaToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#2-kafka-to-gcs) (blogpost [link](https://medium.com/@pniralakeri/importing-data-from-kafka-to-gcs-using-dataproc-serverless-38e449d559f9))
* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream)
* [KafkaToPubSub](/java/src/main/java/com/google/cloud/dataproc/templates/kafka/README.md#3-kafka-to-pubsub)
* [MongoToGCS](/java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-mongo-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/migrating-data-from-mongo-to-gcs-using-java-and-dataproc-serverless-template-390500481804))
* [PubSubToBigQuery](/java/src/main/java/com/google/cloud/dataproc/templates/pubsub#1-pubsub-to-bigquery) (blogpost [link](https://medium.com/google-cloud/from-pub-sub-to-bigquery-streaming-data-in-near-real-time-b550aeff595d))
Expand All @@ -38,6 +40,7 @@ Please refer to the [Dataproc Templates (Java - Spark) README](java/README.md) f
* [TextToBigquery](/java/src/main/java/com/google/cloud/dataproc/templates/gcs#7-text-to-bigquery)
* [WordCount](/java/src/main/java/com/google/cloud/dataproc/templates/word/WordCount.java)
* [KafkaToBQDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#4-kafka-to-bq-via-spark-direct-stream)
* [KafkaToGCSDstream](/java/src/main/java/com/google/cloud/dataproc/templates/kafka#5-kafka-to-gcs-via-spark-direct-stream)

...

Expand Down
1 change: 0 additions & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@
<artifactId>google-cloud-bigtable</artifactId>
<version>2.1.0</version>
</dependency>

<!-- Spark kafka Direct stream-->
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ enum TemplateName {
TEXTTOBIGQUERY,
JDBCTOJDBC,
PUBSUBLITETOBIGTABLE,
KAFKATOBQDSTREAM
KAFKATOBQDSTREAM,
KAFKATOGCSDSTREAM
}

default Properties getProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ public Dataset<Row> readKafkaTopic(SparkSession spark, Properties prop) {

// check on memory constraints

return getDatasetByMessageFormat(inputData, prop);
}

public Dataset<Row> getDatasetByMessageFormat(Dataset<Row> inputData, Properties prop) {
Dataset<Row> processedData = null;
String kakfaMessageFormat = prop.getProperty(KAFKA_MESSAGE_FORMAT);
String kafkaSchemaUrl = prop.getProperty(KAFKA_SCHEMA_URL);

switch (kakfaMessageFormat) {
case "bytes":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataproc.templates.kafka;

import static com.google.cloud.dataproc.templates.util.TemplateConstants.*;

import com.google.cloud.dataproc.templates.BaseTemplate;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.*;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/**
* Spark job to move data or/and schema from Kafka topic to GCS via spark Direct Stream. This
* template can be configured to run in few different modes. In default mode kafka.gcs.write.mode is
* set to "append". For detailed list of properties refer "KafkaToGCSDstream Template properties"
* section in resources/template.properties file.
*/
public class KafkaToGCSDstream implements BaseTemplate {

public static final Logger LOGGER = LoggerFactory.getLogger(KafkaToGCSDstream.class);

private long batchInterval;
private String kafkaMessageFormat;
private String gcsOutputLocation;
private String gcsWriteMode;
private String gcsOutputFormat;
private String kafkaStartingOffsets;
private String kafkaGroupId;
private String kafkaBootstrapServers;
private String kafkaTopic;
private final String sparkLogLevel;
private final String kafkaSchemaUrl;
private Long kafkaAwaitTerminationTimeout;

public KafkaToGCSDstream() {

kafkaBootstrapServers = getProperties().getProperty(KAFKA_BOOTSTRAP_SERVERS);
kafkaTopic = getProperties().getProperty(KAFKA_TOPIC);
kafkaMessageFormat = getProperties().getProperty(KAFKA_MESSAGE_FORMAT);
gcsOutputLocation = getProperties().getProperty(KAFKA_GCS_OUTPUT_LOCATION);
gcsOutputFormat = getProperties().getProperty(KAFKA_GCS_OUTPUT_FORMAT);
kafkaStartingOffsets = getProperties().getProperty(KAFKA_STARTING_OFFSET);
kafkaGroupId = getProperties().getProperty(KAFKA_GCS_CONSUMER_GROUP_ID);
batchInterval = Long.parseLong(getProperties().getProperty(KAFKA_GCS_BATCH_INTERVAL));
gcsWriteMode = getProperties().getProperty(KAFKA_GCS_WRITE_MODE);
sparkLogLevel = getProperties().getProperty(SPARK_LOG_LEVEL);
kafkaSchemaUrl = getProperties().getProperty(KAFKA_SCHEMA_URL);
kafkaAwaitTerminationTimeout =
Long.valueOf(getProperties().getProperty(KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT));
}

@Override
public void runTemplate() throws TimeoutException, SQLException, InterruptedException {

SparkSession spark =
SparkSession.builder().appName("Kafka to GCS via Direct stream").getOrCreate();

spark.sparkContext().setLogLevel(sparkLogLevel);

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", kafkaBootstrapServers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", kafkaGroupId);
kafkaParams.put("auto.offset.reset", kafkaStartingOffsets);
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Collections.singletonList(kafkaTopic);

JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());

JavaStreamingContext ssc = new JavaStreamingContext(sparkContext, new Duration(batchInterval));
KafkaReader reader = new KafkaReader();

JavaInputDStream<ConsumerRecord<Object, Object>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));

stream.foreachRDD(
(VoidFunction2<JavaRDD<ConsumerRecord<Object, Object>>, Time>)
(rdd, time) -> {
LOGGER.debug("Reading kafka data");

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

JavaRDD<Tuple2<String, String>> recordRdd =
rdd.map(record -> new Tuple2(record.key(), record.value()));

Dataset<Row> rowDataset =
spark
.createDataset(
recordRdd.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.withColumnRenamed("_1", "key")
.withColumnRenamed("_2", "value");

Dataset<Row> processedData =
reader.getDatasetByMessageFormat(rowDataset, getProperties());

if (!processedData.isEmpty()) {
DataFrameWriter<Row> writer =
processedData.write().mode(gcsWriteMode).format(gcsOutputFormat);

LOGGER.debug("Writing kafka data into GCS");
writer.format(gcsOutputFormat).save(gcsOutputLocation);
}
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

ssc.start();
ssc.awaitTerminationOrTimeout(kafkaAwaitTerminationTimeout);
}

@Override
public void validateInput() throws IllegalArgumentException {
if (StringUtils.isAllBlank(gcsOutputLocation)
|| StringUtils.isAllBlank(gcsOutputFormat)
|| StringUtils.isAllBlank(gcsWriteMode)
|| StringUtils.isAllBlank(kafkaBootstrapServers)
|| StringUtils.isAllBlank(kafkaTopic)
|| StringUtils.isAllBlank(kafkaMessageFormat)
|| StringUtils.isAllBlank(kafkaGroupId)) {
LOGGER.error(
"{},{},{},{},{},{},{} are required parameter. ",
KAFKA_GCS_OUTPUT_LOCATION,
KAFKA_GCS_OUTPUT_FORMAT,
KAFKA_GCS_WRITE_MODE,
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_TOPIC,
KAFKA_MESSAGE_FORMAT,
KAFKA_GCS_CONSUMER_GROUP_ID);
throw new IllegalArgumentException(
"Required parameters for KafkaTOGCSDstream not passed. "
+ "Set mandatory parameter for KafkaTOGCSDstream template "
+ "in resources/conf/template.properties file.");
}

if (kafkaMessageFormat.equals("json") & StringUtils.isAllBlank(kafkaSchemaUrl)) {
LOGGER.error("{} is a required parameter for JSON format messages", KAFKA_SCHEMA_URL);
throw new IllegalArgumentException("Required parameters for KafkaToGCSDstream not passed.");
}

LOGGER.info(
"Starting kafka to GCS via DStream spark job with following parameters:"
+ "1. {}:{} "
+ "2. {}:{} "
+ "3. {}:{} "
+ "4. {}:{} "
+ "5. {}:{} "
+ "6. {}:{} "
+ "7. {}:{} "
+ "8. {}:{} "
+ "9. {}:{} "
+ "10. {}:{} ",
KAFKA_MESSAGE_FORMAT,
kafkaMessageFormat,
KAFKA_GCS_OUTPUT_LOCATION,
gcsOutputLocation,
KAFKA_GCS_OUTPUT_FORMAT,
gcsOutputFormat,
KAFKA_GCS_WRITE_MODE,
gcsWriteMode,
KAFKA_BOOTSTRAP_SERVERS,
kafkaBootstrapServers,
KAFKA_TOPIC,
kafkaTopic,
KAFKA_GCS_BATCH_INTERVAL,
batchInterval,
KAFKA_SCHEMA_URL,
kafkaSchemaUrl,
KAFKA_GCS_CONSUMER_GROUP_ID,
kafkaGroupId,
KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT,
kafkaAwaitTerminationTimeout);
}
}
Loading

0 comments on commit 7bc3e09

Please sign in to comment.