diff --git a/src/main/scala/com/sap/kafka/connect/source/querier/TableQuerier.scala b/src/main/scala/com/sap/kafka/connect/source/querier/TableQuerier.scala index 38c732e..e2334dc 100644 --- a/src/main/scala/com/sap/kafka/connect/source/querier/TableQuerier.scala +++ b/src/main/scala/com/sap/kafka/connect/source/querier/TableQuerier.scala @@ -1,15 +1,13 @@ package com.sap.kafka.connect.source.querier import com.sap.kafka.client.hana.HANAJdbcClient -import com.sap.kafka.connect.config.{BaseConfig, BaseConfigConstants} import com.sap.kafka.connect.config.hana.HANAConfig +import com.sap.kafka.connect.config.{BaseConfig, BaseConfigConstants} import com.sap.kafka.utils.hana.HANAJdbcTypeConverter import org.apache.kafka.connect.data.{Schema, Struct} import org.apache.kafka.connect.source.SourceRecord import org.slf4j.LoggerFactory -import scala.util.Random - abstract class TableQuerier(mode: String, tableOrQuery: String, topic: String, config: BaseConfig, var jdbcClient: Option[HANAJdbcClient]) @@ -92,7 +90,7 @@ abstract class TableQuerier(mode: String, tableOrQuery: String, case BaseConfigConstants.QUERY_MODE_SQL => if (getOrCreateJdbcClient().get.isInstanceOf[HANAJdbcClient]) { val metadata = getOrCreateJdbcClient().get.getMetadata(tableOrQuery) - HANAJdbcTypeConverter.convertHANAMetadataToSchema("Query" + Random.nextInt, metadata, options) + HANAJdbcTypeConverter.convertHANAMetadataToSchema(topic, metadata, options) } else { throw new RuntimeException("Jdbc Client is not available") }