From 1ef66e389d5ee23e9d9927dbf49e8960106f8507 Mon Sep 17 00:00:00 2001 From: elakito Date: Thu, 22 Sep 2022 16:18:04 +0200 Subject: [PATCH] Check if the record has a struct-schema (#135) --- .../connect/sink/hana/HANASinkRecordsCollector.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala b/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala index 6293ffb..42ff5d7 100644 --- a/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala +++ b/src/main/scala/com/sap/kafka/connect/sink/hana/HANASinkRecordsCollector.scala @@ -1,7 +1,6 @@ package com.sap.kafka.connect.sink.hana import java.sql.Connection - import com.sap.kafka.client.hana.{HANAConfigInvalidInputException, HANAConfigMissingException, HANAJdbcClient} import com.sap.kafka.client.{MetaSchema, metaAttr} import com.sap.kafka.connect.config.BaseConfigConstants @@ -9,6 +8,7 @@ import com.sap.kafka.connect.config.hana.HANAConfig import com.sap.kafka.schema.KeyValueSchema import com.sap.kafka.utils.hana.HANAJdbcTypeConverter import com.sap.kafka.utils.SchemaNotMatchedException +import org.apache.kafka.connect.data.Schema.Type import org.apache.kafka.connect.data.{Field, Schema} import org.apache.kafka.connect.sink.SinkRecord import org.slf4j.{Logger, LoggerFactory} @@ -60,7 +60,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient, var recordFields = Seq[metaAttr]() //REVISIT we should cache keySchema and valueSchema at the collector to perform a quick comparison // we build recordFields for valueSchema to compare it against metaSchema retrieved from the table - if (recordSchema.keySchema != null) { + if (recordSchema.keySchema != null && recordSchema.keySchema.`type` == Type.STRUCT) { for (field <- recordSchema.keySchema.fields.asScala) { allFields.add(field.name) val fieldSchema: Schema = field.schema @@ -70,7 +70,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient, } } - if (recordSchema.valueSchema != null) { + if (recordSchema.valueSchema != null && recordSchema.valueSchema.`type` == Type.STRUCT) { for (field <- recordSchema.valueSchema.fields.asScala) { if (!allFields.contains(field.name)) { val fieldSchema: Schema = field.schema @@ -120,7 +120,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient, metaSchema = new MetaSchema(Seq[metaAttr](), Seq[Field]()) - if (recordSchema.keySchema != null) { + if (recordSchema.keySchema != null && recordSchema.keySchema.`type` == Type.STRUCT) { for (field <- recordSchema.keySchema.fields.asScala) { allFields.add(field.name) val fieldSchema: Schema = field.schema @@ -131,7 +131,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient, } } - if (recordSchema.valueSchema != null) { + if (recordSchema.valueSchema != null && recordSchema.valueSchema.`type` == Type.STRUCT) { for (field <- recordSchema.valueSchema.fields.asScala) { if (!allFields.contains(field.name)) { val fieldSchema: Schema = field.schema