Skip to content

Commit

Permalink
Check if the record has a struct-schema (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
elakito authored Sep 22, 2022
1 parent dd4865e commit 1ef66e3
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.sap.kafka.connect.sink.hana

import java.sql.Connection

import com.sap.kafka.client.hana.{HANAConfigInvalidInputException, HANAConfigMissingException, HANAJdbcClient}
import com.sap.kafka.client.{MetaSchema, metaAttr}
import com.sap.kafka.connect.config.BaseConfigConstants
import com.sap.kafka.connect.config.hana.HANAConfig
import com.sap.kafka.schema.KeyValueSchema
import com.sap.kafka.utils.hana.HANAJdbcTypeConverter
import com.sap.kafka.utils.SchemaNotMatchedException
import org.apache.kafka.connect.data.Schema.Type
import org.apache.kafka.connect.data.{Field, Schema}
import org.apache.kafka.connect.sink.SinkRecord
import org.slf4j.{Logger, LoggerFactory}
Expand Down Expand Up @@ -60,7 +60,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient,
var recordFields = Seq[metaAttr]()
//REVISIT we should cache keySchema and valueSchema at the collector to perform a quick comparison
// we build recordFields for valueSchema to compare it against metaSchema retrieved from the table
if (recordSchema.keySchema != null) {
if (recordSchema.keySchema != null && recordSchema.keySchema.`type` == Type.STRUCT) {
for (field <- recordSchema.keySchema.fields.asScala) {
allFields.add(field.name)
val fieldSchema: Schema = field.schema
Expand All @@ -70,7 +70,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient,
}
}

if (recordSchema.valueSchema != null) {
if (recordSchema.valueSchema != null && recordSchema.valueSchema.`type` == Type.STRUCT) {
for (field <- recordSchema.valueSchema.fields.asScala) {
if (!allFields.contains(field.name)) {
val fieldSchema: Schema = field.schema
Expand Down Expand Up @@ -120,7 +120,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient,

metaSchema = new MetaSchema(Seq[metaAttr](), Seq[Field]())

if (recordSchema.keySchema != null) {
if (recordSchema.keySchema != null && recordSchema.keySchema.`type` == Type.STRUCT) {
for (field <- recordSchema.keySchema.fields.asScala) {
allFields.add(field.name)
val fieldSchema: Schema = field.schema
Expand All @@ -131,7 +131,7 @@ class HANASinkRecordsCollector(var tableName: String, client: HANAJdbcClient,
}
}

if (recordSchema.valueSchema != null) {
if (recordSchema.valueSchema != null && recordSchema.valueSchema.`type` == Type.STRUCT) {
for (field <- recordSchema.valueSchema.fields.asScala) {
if (!allFields.contains(field.name)) {
val fieldSchema: Schema = field.schema
Expand Down

0 comments on commit 1ef66e3

Please sign in to comment.