From 6eaa55af2a3556770234beec47e262ad281058d6 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Thu, 17 Oct 2024 17:37:22 -0400 Subject: [PATCH 1/3] Implemented support for preserving writetimes & TTL on tables with only collection --- .../com/datastax/cdm/cql/EnhancedSession.java | 43 ++++---- .../cdm/cql/codec/BIGINT_BigIntegerCodec.java | 70 ++++++++++++ .../cdm/cql/codec/BigInteger_BIGINTCodec.java | 69 ++++++++++++ .../datastax/cdm/cql/codec/CodecFactory.java | 3 + .../com/datastax/cdm/cql/codec/Codecset.java | 4 +- .../datastax/cdm/feature/WritetimeTTL.java | 40 ++++++- .../cdm/properties/KnownProperties.java | 3 + .../com/datastax/cdm/schema/CqlTable.java | 10 +- .../cdm/cql/codec/ASCII_BLOBCodecTest.java | 9 ++ .../cql/codec/BIGINT_BigIntegerCodecTest.java | 100 ++++++++++++++++++ .../cdm/cql/codec/BLOB_ASCIICodecTest.java | 10 ++ .../cdm/cql/codec/BLOB_TextCodecTest.java | 11 ++ .../cql/codec/BigInteger_BIGINTCodecTest.java | 100 ++++++++++++++++++ ...itetimeTest.java => WritetimeTTLTest.java} | 21 +++- 14 files changed, 458 insertions(+), 35 deletions(-) create mode 100644 src/main/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodec.java create mode 100644 src/main/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodec.java create mode 100644 src/test/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodecTest.java create mode 100644 src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java rename src/test/java/com/datastax/cdm/feature/{TTLAndWritetimeTest.java => WritetimeTTLTest.java} (96%) diff --git a/src/main/java/com/datastax/cdm/cql/EnhancedSession.java b/src/main/java/com/datastax/cdm/cql/EnhancedSession.java index 7aeb7175..f311b828 100644 --- a/src/main/java/com/datastax/cdm/cql/EnhancedSession.java +++ b/src/main/java/com/datastax/cdm/cql/EnhancedSession.java @@ -15,6 +15,8 @@ */ package com.datastax.cdm.cql; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.slf4j.Logger; @@ -22,17 +24,18 @@ import com.datastax.cdm.cql.codec.CodecFactory; import com.datastax.cdm.cql.codec.Codecset; -import com.datastax.cdm.cql.statement.*; +import com.datastax.cdm.cql.statement.OriginSelectByPKStatement; +import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement; +import com.datastax.cdm.cql.statement.TargetInsertStatement; +import com.datastax.cdm.cql.statement.TargetSelectByPKStatement; +import com.datastax.cdm.cql.statement.TargetUpdateStatement; +import com.datastax.cdm.cql.statement.TargetUpsertStatement; import com.datastax.cdm.data.PKFactory; import com.datastax.cdm.properties.KnownProperties; import com.datastax.cdm.properties.PropertyHelper; import com.datastax.cdm.schema.CqlTable; import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.type.DataType; -import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException; -import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry; -import com.datastax.oss.driver.api.core.type.reflect.GenericType; public class EnhancedSession { public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); @@ -96,26 +99,16 @@ public TargetUpsertStatement getTargetUpsertStatement() { } private CqlSession initSession(PropertyHelper propertyHelper, CqlSession session) { - List codecList = propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS); - if (null != codecList && !codecList.isEmpty()) { - MutableCodecRegistry registry = (MutableCodecRegistry) session.getContext().getCodecRegistry(); - - for (String codecString : codecList) { - Codecset codecEnum = Codecset.valueOf(codecString); - for (TypeCodec codec : CodecFactory.getCodecPair(propertyHelper, codecEnum)) { - DataType dataType = codec.getCqlType(); - GenericType javaType = codec.getJavaType(); - if (logDebug) - logger.debug("Registering Codec {} for CQL type {} and Java type {}", - codec.getClass().getSimpleName(), dataType, javaType); - try { - registry.codecFor(dataType, javaType); - } catch (CodecNotFoundException e) { - registry.register(codec); - } - } - } - } + // BIGINT_BIGINTEGER codec is always needed to compare C* writetimes in collection columns + List codecList = new ArrayList<>(Arrays.asList("BIGINT_BIGINTEGER")); + + if (null != propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS)) + codecList.addAll(propertyHelper.getStringList(KnownProperties.TRANSFORM_CODECS)); + MutableCodecRegistry registry = (MutableCodecRegistry) session.getContext().getCodecRegistry(); + + codecList.stream().map(Codecset::valueOf).map(codec -> CodecFactory.getCodecPair(propertyHelper, codec)) + .flatMap(List::stream).forEach(registry::register); + return session; } diff --git a/src/main/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodec.java b/src/main/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodec.java new file mode 100644 index 00000000..b990593e --- /dev/null +++ b/src/main/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodec.java @@ -0,0 +1,70 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.cql.codec; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.jetbrains.annotations.NotNull; + +import com.datastax.cdm.properties.PropertyHelper; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; +import com.datastax.oss.driver.api.core.type.reflect.GenericType; + +public class BIGINT_BigIntegerCodec extends AbstractBaseCodec { + + public BIGINT_BigIntegerCodec(PropertyHelper propertyHelper) { + super(propertyHelper); + } + + @Override + public @NotNull GenericType getJavaType() { + return GenericType.BIG_INTEGER; + } + + @Override + public @NotNull DataType getCqlType() { + return DataTypes.BIGINT; + } + + @Override + public ByteBuffer encode(BigInteger value, @NotNull ProtocolVersion protocolVersion) { + if (value == null) { + return null; + } else { + return TypeCodecs.BIGINT.encode(value.longValue(), protocolVersion); + } + } + + @Override + public BigInteger decode(ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) { + return BigInteger.valueOf(TypeCodecs.BIGINT.decode(bytes, protocolVersion)); + } + + @Override + public @NotNull String format(BigInteger value) { + return TypeCodecs.BIGINT.format(value.longValue()); + } + + @Override + public BigInteger parse(String value) { + return BigInteger.valueOf(TypeCodecs.BIGINT.parse(value)); + } + +} diff --git a/src/main/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodec.java b/src/main/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodec.java new file mode 100644 index 00000000..40429341 --- /dev/null +++ b/src/main/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodec.java @@ -0,0 +1,69 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.cql.codec; + +import java.nio.ByteBuffer; + +import org.jetbrains.annotations.NotNull; + +import com.datastax.cdm.properties.PropertyHelper; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; +import com.datastax.oss.driver.api.core.type.reflect.GenericType; + +public class BigInteger_BIGINTCodec extends AbstractBaseCodec { + + public BigInteger_BIGINTCodec(PropertyHelper propertyHelper) { + super(propertyHelper); + } + + @Override + public @NotNull GenericType getJavaType() { + return GenericType.INTEGER; + } + + @Override + public @NotNull DataType getCqlType() { + return DataTypes.INT; + } + + @Override + public ByteBuffer encode(Integer value, @NotNull ProtocolVersion protocolVersion) { + if (value == null) { + return null; + } else { + return TypeCodecs.INT.encode(value, protocolVersion); + } + } + + @Override + public Integer decode(ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) { + return TypeCodecs.INT.decode(bytes, protocolVersion); + } + + @Override + public @NotNull String format(Integer value) { + return TypeCodecs.INT.format(value); + } + + @Override + public Integer parse(String value) { + return TypeCodecs.INT.parse(value); + } + +} diff --git a/src/main/java/com/datastax/cdm/cql/codec/CodecFactory.java b/src/main/java/com/datastax/cdm/cql/codec/CodecFactory.java index c4e7387b..f9a06da2 100644 --- a/src/main/java/com/datastax/cdm/cql/codec/CodecFactory.java +++ b/src/main/java/com/datastax/cdm/cql/codec/CodecFactory.java @@ -34,6 +34,9 @@ public static List> getCodecPair(PropertyHelper propertyHelper, Cod return Arrays.asList(new DOUBLE_StringCodec(propertyHelper), new TEXT_DoubleCodec(propertyHelper)); case BIGINT_STRING: return Arrays.asList(new BIGINT_StringCodec(propertyHelper), new TEXT_LongCodec(propertyHelper)); + case BIGINT_BIGINTEGER: + return Arrays.asList(new BIGINT_BigIntegerCodec(propertyHelper), + new BigInteger_BIGINTCodec(propertyHelper)); case STRING_BLOB: return Arrays.asList(new TEXT_BLOBCodec(propertyHelper), new BLOB_TEXTCodec(propertyHelper)); case ASCII_BLOB: diff --git a/src/main/java/com/datastax/cdm/cql/codec/Codecset.java b/src/main/java/com/datastax/cdm/cql/codec/Codecset.java index 07739075..2c88b5b4 100644 --- a/src/main/java/com/datastax/cdm/cql/codec/Codecset.java +++ b/src/main/java/com/datastax/cdm/cql/codec/Codecset.java @@ -16,6 +16,6 @@ package com.datastax.cdm.cql.codec; public enum Codecset { - INT_STRING, DOUBLE_STRING, BIGINT_STRING, DECIMAL_STRING, TIMESTAMP_STRING_MILLIS, TIMESTAMP_STRING_FORMAT, - POINT_TYPE, POLYGON_TYPE, DATE_RANGE, LINE_STRING, STRING_BLOB, ASCII_BLOB + INT_STRING, DOUBLE_STRING, BIGINT_STRING, BIGINT_BIGINTEGER, DECIMAL_STRING, TIMESTAMP_STRING_MILLIS, + TIMESTAMP_STRING_FORMAT, POINT_TYPE, POLYGON_TYPE, DATE_RANGE, LINE_STRING, STRING_BLOB, ASCII_BLOB } diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index 3ef787cf..0e224dfe 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -15,6 +15,7 @@ */ package com.datastax.cdm.feature; +import java.math.BigInteger; import java.time.Instant; import java.util.*; import java.util.stream.Collectors; @@ -45,6 +46,7 @@ public class WritetimeTTL extends AbstractFeature { private Long filterMax; private boolean hasWriteTimestampFilter; private Long writetimeIncrement; + private boolean allowCollectionsForWritetimeTTL; @Override public boolean loadProperties(IPropertyHelper propertyHelper) { @@ -61,7 +63,7 @@ public boolean loadProperties(IPropertyHelper propertyHelper) { logger.info("PARAM -- WriteTimestampCols: {}", writetimeNames); this.autoWritetimeNames = false; } - + allowCollectionsForWritetimeTTL = propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS); this.customWritetime = getCustomWritetime(propertyHelper); if (this.customWritetime > 0) { logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.TRANSFORM_CUSTOM_WRITETIME, customWritetime, @@ -233,11 +235,25 @@ public Long getLargestWriteTimeStamp(Row row) { return this.customWritetime; if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty()) return null; - OptionalLong max = this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull) - .max(); + + OptionalLong max = (allowCollectionsForWritetimeTTL) ? getMaxWriteTimeStampForCollections(row) + : getMaxWriteTimeStamp(row); + return max.isPresent() ? max.getAsLong() + this.writetimeIncrement : null; } + private OptionalLong getMaxWriteTimeStampForCollections(Row row) { + return this.writetimeSelectColumnIndexes.stream().map(col -> { + if (row.getType(col).equals(DataTypes.BIGINT)) + return Arrays.asList(row.getLong(col)); + return row.getList(col, BigInteger.class).stream().map(BigInteger::longValue).collect(Collectors.toList()); + }).flatMap(List::stream).mapToLong(Long::longValue).max(); + } + + private OptionalLong getMaxWriteTimeStamp(Row row) { + return this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull).max(); + } + public Integer getLargestTTL(Row row) { if (logDebug) logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes); @@ -245,8 +261,22 @@ public Integer getLargestTTL(Row row) { return this.customTTL.intValue(); if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty()) return null; - OptionalInt max = this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max(); - return max.isPresent() ? max.getAsInt() : null; + + OptionalInt max = (allowCollectionsForWritetimeTTL) ? getMaxTTLForCollections(row) : getMaxTTL(row); + + return max.isPresent() ? max.getAsInt() : 0; + } + + private OptionalInt getMaxTTLForCollections(Row row) { + return this.ttlSelectColumnIndexes.stream().map(col -> { + if (row.getType(col).equals(DataTypes.INT)) + return Arrays.asList(row.getInt(col)); + return row.getList(col, Integer.class).stream().collect(Collectors.toList()); + }).flatMap(List::stream).filter(Objects::nonNull).mapToInt(Integer::intValue).max(); + } + + private OptionalInt getMaxTTL(Row row) { + return this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max(); } private void validateTTLColumns(CqlTable originTable) { diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index b35d83dd..d883c967 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -78,6 +78,7 @@ public enum PropertyType { public static final String ORIGIN_TTL_NAMES = "spark.cdm.schema.origin.column.ttl.names"; public static final String ORIGIN_WRITETIME_AUTO = "spark.cdm.schema.origin.column.writetime.automatic"; public static final String ORIGIN_WRITETIME_NAMES = "spark.cdm.schema.origin.column.writetime.names"; + public static final String ALLOW_COLL_FOR_WRITETIME_TTL_COLS = "spark.cdm.schema.origin.column.ttlwritetime.allow.collections"; public static final String ORIGIN_COLUMN_NAMES_TO_TARGET = "spark.cdm.schema.origin.column.names.to.target"; @@ -90,6 +91,8 @@ public enum PropertyType { types.put(ORIGIN_WRITETIME_NAMES, PropertyType.STRING_LIST); types.put(ORIGIN_WRITETIME_AUTO, PropertyType.BOOLEAN); defaults.put(ORIGIN_WRITETIME_AUTO, "true"); + types.put(ALLOW_COLL_FOR_WRITETIME_TTL_COLS, PropertyType.BOOLEAN); + defaults.put(ALLOW_COLL_FOR_WRITETIME_TTL_COLS, "false"); types.put(ORIGIN_COLUMN_NAMES_TO_TARGET, PropertyType.STRING_LIST); } diff --git a/src/main/java/com/datastax/cdm/schema/CqlTable.java b/src/main/java/com/datastax/cdm/schema/CqlTable.java index f2dccc42..7c75770b 100644 --- a/src/main/java/com/datastax/cdm/schema/CqlTable.java +++ b/src/main/java/com/datastax/cdm/schema/CqlTable.java @@ -470,15 +470,19 @@ private void setCqlMetadata(CqlSession cqlSession) { .filter(md -> !extractJsonExclusive || md.getName().asCql(true).endsWith(columnName)) .collect(Collectors.toCollection(() -> this.cqlAllColumns)); + boolean allowCollectionsForWritetimeTTL = propertyHelper + .getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS); this.writetimeTTLColumns = tableMetadata.getColumns().values().stream() - .filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata)) + .filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata, + allowCollectionsForWritetimeTTL)) .map(ColumnMetadata::getName).map(CqlIdentifier::asInternal).collect(Collectors.toList()); this.columnNameToCqlTypeMap = this.cqlAllColumns.stream().collect( Collectors.toMap(columnMetadata -> columnMetadata.getName().asInternal(), ColumnMetadata::getType)); } - private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata) { + private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata, + boolean allowCollectionsForWritetimeTTL) { DataType dataType = columnMetadata.getType(); boolean isKeyColumn = tableMetadata.getPartitionKey().contains(columnMetadata) || tableMetadata.getClusteringColumns().containsKey(columnMetadata); @@ -492,6 +496,8 @@ private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnM // supported here? if (CqlData.isFrozen(dataType)) return true; + if (allowCollectionsForWritetimeTTL && CqlData.isCollection(dataType)) + return true; return false; } diff --git a/src/test/java/com/datastax/cdm/cql/codec/ASCII_BLOBCodecTest.java b/src/test/java/com/datastax/cdm/cql/codec/ASCII_BLOBCodecTest.java index 9d79f5b7..9ec688d9 100644 --- a/src/test/java/com/datastax/cdm/cql/codec/ASCII_BLOBCodecTest.java +++ b/src/test/java/com/datastax/cdm/cql/codec/ASCII_BLOBCodecTest.java @@ -19,10 +19,12 @@ import java.nio.ByteBuffer; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; public class ASCII_BLOBCodecTest { private final String INPUT = "Encode this Text string to Blob"; @@ -42,4 +44,11 @@ public void encodeDecode() { assertEquals(retBuffer, codec.parse(INPUT)); } + @Test + void testFormat() { + String expected = TypeCodecs.ASCII.format(INPUT); + + String result = codec.format(ByteBuffer.wrap(INPUT.getBytes())); + Assertions.assertEquals(expected, result); + } } diff --git a/src/test/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodecTest.java b/src/test/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodecTest.java new file mode 100644 index 00000000..bea816b3 --- /dev/null +++ b/src/test/java/com/datastax/cdm/cql/codec/BIGINT_BigIntegerCodecTest.java @@ -0,0 +1,100 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.cql.codec; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.cdm.data.CqlConversion; +import com.datastax.cdm.properties.PropertyHelper; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; +import com.datastax.oss.driver.api.core.type.reflect.GenericType; + +class BIGINT_BigIntegerCodecTest { + + private BIGINT_BigIntegerCodec codec; + + @BeforeEach + void setUp() { + codec = new BIGINT_BigIntegerCodec(null); + } + + @AfterEach + void tearDown() { + PropertyHelper.destroyInstance(); + } + + @Test + void getJavaType_ShouldReturnStringType() { + Assertions.assertEquals(GenericType.BIG_INTEGER, codec.getJavaType()); + } + + @Test + void getCqlType_ShouldReturnIntType() { + Assertions.assertEquals(DataTypes.BIGINT, codec.getCqlType()); + } + + @Test + void encode_ShouldReturnNull_WhenValueIsNull() { + ByteBuffer result = codec.encode(null, CqlConversion.PROTOCOL_VERSION); + Assertions.assertNull(result); + } + + @Test + void encode_ShouldEncodeStringValueToByteBuffer_WhenValueIsNotNull() { + ByteBuffer expected = TypeCodecs.BIGINT.encode(101l, CqlConversion.PROTOCOL_VERSION); + + ByteBuffer result = codec.encode(BigInteger.valueOf(101l), CqlConversion.PROTOCOL_VERSION); + CodecTestHelper.assertByteBufferEquals(expected, result); + } + + @Test + void decode_ShouldDecodeByteBufferToValueAndReturnAsString() { + ByteBuffer byteBuffer = TypeCodecs.BIGINT.encode(101l, CqlConversion.PROTOCOL_VERSION); + + BigInteger result = codec.decode(byteBuffer, CqlConversion.PROTOCOL_VERSION); + Assertions.assertEquals(BigInteger.valueOf(101l), result); + } + + @Test + void testFormat() { + String valueAsString = "9223372036854775807"; + Long value = Long.parseLong(valueAsString); + String expected = TypeCodecs.BIGINT.format(value); + + String result = codec.format(BigInteger.valueOf(9223372036854775807l)); + Assertions.assertEquals(expected, result); + } + + @Test + void testParse() { + String valueAsString = "9223372036854775807"; + BigInteger result = codec.parse(valueAsString); + Assertions.assertEquals(BigInteger.valueOf(9223372036854775807l), result); + } + + @Test + void parse_ShouldThrowIllegalArgumentException_WhenValueIsNotANumber() { + String valueAsString = "not a number"; + Assertions.assertThrows(IllegalArgumentException.class, () -> codec.parse(valueAsString)); + } +} diff --git a/src/test/java/com/datastax/cdm/cql/codec/BLOB_ASCIICodecTest.java b/src/test/java/com/datastax/cdm/cql/codec/BLOB_ASCIICodecTest.java index 85486534..3a927cd5 100644 --- a/src/test/java/com/datastax/cdm/cql/codec/BLOB_ASCIICodecTest.java +++ b/src/test/java/com/datastax/cdm/cql/codec/BLOB_ASCIICodecTest.java @@ -19,10 +19,12 @@ import java.nio.ByteBuffer; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; public class BLOB_ASCIICodecTest { private final String INPUT = "Encode this Text string to Blob"; @@ -41,4 +43,12 @@ public void testEncode() { assertEquals(INPUT, retBuffer); } + @Test + void testFormat() { + ByteBuffer bb = ByteBuffer.wrap(INPUT.getBytes()); + String expected = TypeCodecs.BLOB.format(bb); + + String result = codec.format(INPUT); + Assertions.assertEquals(expected, result); + } } diff --git a/src/test/java/com/datastax/cdm/cql/codec/BLOB_TextCodecTest.java b/src/test/java/com/datastax/cdm/cql/codec/BLOB_TextCodecTest.java index 7a6707df..385b1700 100644 --- a/src/test/java/com/datastax/cdm/cql/codec/BLOB_TextCodecTest.java +++ b/src/test/java/com/datastax/cdm/cql/codec/BLOB_TextCodecTest.java @@ -19,10 +19,12 @@ import java.nio.ByteBuffer; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; public class BLOB_TextCodecTest { private final String INPUT = "Encode this Text string to Blob"; @@ -41,4 +43,13 @@ public void testEncode() { assertEquals(INPUT, retBuffer); } + @Test + void testFormat() { + ByteBuffer bb = ByteBuffer.wrap(INPUT.getBytes()); + String expected = TypeCodecs.BLOB.format(bb); + + String result = codec.format(INPUT); + Assertions.assertEquals(expected, result); + } + } diff --git a/src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java b/src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java new file mode 100644 index 00000000..e6cf4fcf --- /dev/null +++ b/src/test/java/com/datastax/cdm/cql/codec/BigInteger_BIGINTCodecTest.java @@ -0,0 +1,100 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.cdm.cql.codec; + +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.datastax.cdm.data.CqlConversion; +import com.datastax.cdm.properties.PropertyHelper; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; +import com.datastax.oss.driver.api.core.type.reflect.GenericType; + +class BigInteger_BIGINTCodecTest { + + private BigInteger_BIGINTCodec codec; + + @BeforeEach + void setUp() { + codec = new BigInteger_BIGINTCodec(null); + } + + @AfterEach + void tearDown() { + PropertyHelper.destroyInstance(); + } + + @Test + void getJavaType_ShouldReturnStringType() { + Assertions.assertEquals(GenericType.INTEGER, codec.getJavaType()); + } + + @Test + void getCqlType_ShouldReturnIntType() { + Assertions.assertEquals(DataTypes.INT, codec.getCqlType()); + } + + @Test + void encode_ShouldReturnNull_WhenValueIsNull() { + ByteBuffer result = codec.encode(null, CqlConversion.PROTOCOL_VERSION); + Assertions.assertNull(result); + } + + @Test + void encode_ShouldEncodeStringValueToByteBuffer_WhenValueIsNotNull() { + ByteBuffer expected = TypeCodecs.INT.encode(Integer.valueOf(101), CqlConversion.PROTOCOL_VERSION); + + ByteBuffer result = codec.encode(Integer.valueOf(101), CqlConversion.PROTOCOL_VERSION); + CodecTestHelper.assertByteBufferEquals(expected, result); + } + + @Test + void decode_ShouldDecodeByteBufferToValueAndReturnAsString() { + ByteBuffer byteBuffer = TypeCodecs.INT.encode(101, CqlConversion.PROTOCOL_VERSION); + + Integer result = codec.decode(byteBuffer, CqlConversion.PROTOCOL_VERSION); + Assertions.assertEquals(Integer.valueOf(101), result); + } + + @Test + void format_ShouldFormatValueAsString() { + String valueAsString = "999999"; + Long value = Long.parseLong(valueAsString); + String expected = TypeCodecs.BIGINT.format(value); + + String result = codec.format(Integer.valueOf(valueAsString)); + Assertions.assertEquals(expected, result); + } + + @Test + void parse_ShouldParseStringToValueAndReturnAsString() { + String valueAsString = "999999"; + Integer result = codec.parse(valueAsString); + Assertions.assertEquals(Integer.valueOf(valueAsString), result); + } + + @Test + void parse_ShouldThrowIllegalArgumentException_WhenValueIsNotANumber() { + String valueAsString = "not a number"; + Assertions.assertThrows(IllegalArgumentException.class, () -> codec.parse(valueAsString)); + } +} diff --git a/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java similarity index 96% rename from src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java rename to src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java index 101a0ed1..9955b93d 100644 --- a/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java +++ b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -31,7 +32,7 @@ import com.datastax.cdm.properties.KnownProperties; import com.datastax.oss.driver.api.core.type.DataTypes; -public class TTLAndWritetimeTest extends CommonMocks { +public class WritetimeTTLTest extends CommonMocks { WritetimeTTL feature; Long customWritetime = 123456789L; @@ -364,6 +365,24 @@ public void getLargestWriteTimeStampTest() { assertEquals(3000L, largestWritetime); } + @Test + public void getLargestWriteTimeStampWithListTest() { + when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS)).thenReturn(true); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L); + when(originTable.indexOf("WRITETIME("+writetimeColumnName+")")).thenReturn(100); + when(originRow.getType(eq(100))).thenReturn(DataTypes.listOf(DataTypes.BIGINT)); + when(originRow.getList(eq(100), eq(BigInteger.class))).thenReturn(Arrays.asList(BigInteger.valueOf(4001), BigInteger.valueOf(2001))); + when(originTable.indexOf("WRITETIME("+writetimeTTLColumnName+")")).thenReturn(101); + when(originRow.getType(eq(101))).thenReturn(DataTypes.BIGINT); + when(originRow.getLong(eq(101))).thenReturn(1000l); + + feature.loadProperties(propertyHelper); + feature.initializeAndValidate(originTable, targetTable); + + Long largestWritetime = feature.getLargestWriteTimeStamp(originRow); + assertEquals(4001L, largestWritetime); + } + @Test public void getLargestWriteTimeStampWithCustomTimeTest() { when(originTable.indexOf("WRITETIME("+writetimeColumnName+")")).thenReturn(100); From 6ee64340c39d80e3b0f2c47f103a1be79c450d23 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Fri, 18 Oct 2024 12:07:36 -0400 Subject: [PATCH 2/3] Additional tests, fixes and docs --- README.md | 6 +++--- RELEASE.md | 3 +++ .../com/datastax/cdm/feature/WritetimeTTL.java | 11 ++++++----- src/resources/cdm-detailed.properties | 14 ++++++++++---- .../datastax/cdm/feature/WritetimeTTLTest.java | 17 +++++++++++++++++ 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index cd7c56a6..6020bfa0 100644 --- a/README.md +++ b/README.md @@ -146,9 +146,9 @@ spark-submit --properties-file cdm.properties \ # Things to know - Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace. - CDM does not migrate `ttl` & `writetime` at the field-level (for optimization reasons). It instead finds the field with the highest `ttl` & the field with the highest `writetime` within an `origin` row and uses those values on the entire `target` row. -- CDM ignores `ttl` & `writetime` on collection and UDT fields while computing the highest value -- If a table has only collection and/or UDT non-key columns and not table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry. -- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. Alternatively if needed, the param `spark.cdm.transform.custom.writetime` can be used to set a static custom value for `writetime`. +- CDM ignores using collection and UDT fields for `ttl` & `writetime` calculations by default for performance reasons. If you want to include such fields, set `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true`. +- If a table has only collection and/or UDT non-key columns and no table-level `ttl` configuration, the target will have no `ttl`, which can lead to inconsistencies between `origin` and `target` as rows expire on `origin` due to `ttl` expiry. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. +- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well. diff --git a/RELEASE.md b/RELEASE.md index fd6cc0b8..d4caca1b 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,7 @@ # Release Notes +## [4.6.0] - 2024-10-18 +- Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs. + ## [4.5.1] - 2024-10-11 - Made CDM generated SCB unique & much short-lived when using the TLS option to connect to Astra more securely. diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index 0e224dfe..8dd26134 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -246,12 +246,13 @@ private OptionalLong getMaxWriteTimeStampForCollections(Row row) { return this.writetimeSelectColumnIndexes.stream().map(col -> { if (row.getType(col).equals(DataTypes.BIGINT)) return Arrays.asList(row.getLong(col)); - return row.getList(col, BigInteger.class).stream().map(BigInteger::longValue).collect(Collectors.toList()); - }).flatMap(List::stream).mapToLong(Long::longValue).max(); + return row.getList(col, BigInteger.class).stream().filter(Objects::nonNull).map(BigInteger::longValue) + .collect(Collectors.toList()); + }).flatMap(List::stream).filter(Objects::nonNull).mapToLong(Long::longValue).max(); } private OptionalLong getMaxWriteTimeStamp(Row row) { - return this.writetimeSelectColumnIndexes.stream().mapToLong(row::getLong).filter(Objects::nonNull).max(); + return this.writetimeSelectColumnIndexes.stream().filter(Objects::nonNull).mapToLong(row::getLong).max(); } public Integer getLargestTTL(Row row) { @@ -271,12 +272,12 @@ private OptionalInt getMaxTTLForCollections(Row row) { return this.ttlSelectColumnIndexes.stream().map(col -> { if (row.getType(col).equals(DataTypes.INT)) return Arrays.asList(row.getInt(col)); - return row.getList(col, Integer.class).stream().collect(Collectors.toList()); + return row.getList(col, Integer.class).stream().filter(Objects::nonNull).collect(Collectors.toList()); }).flatMap(List::stream).filter(Objects::nonNull).mapToInt(Integer::intValue).max(); } private OptionalInt getMaxTTL(Row row) { - return this.ttlSelectColumnIndexes.stream().mapToInt(row::getInt).filter(Objects::nonNull).max(); + return this.ttlSelectColumnIndexes.stream().filter(Objects::nonNull).mapToInt(row::getInt).max(); } private void validateTTLColumns(CqlTable originTable) { diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index 96f1438d..a02fbd8c 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -94,27 +94,33 @@ spark.cdm.connect.target.password cassandra # the WRITETIME of the target record. # # Other Parameters: -# spark.cdm.schema.origin -# .column +# spark.cdm.schema +# .origin.column # .names.to.target : Default is empty. If column names are changed between Origin and Target, then # this map-like list provides a mechanism to associate the two. The format is # origin_column_name:target_column_name. The list is comma-separated. Only renamed # columns need to be listed. +# +# .ttlwritetime.calc +# .useCollections : Default is false. When true, TTL and WRITETIME max calculations will include +# collections and UDTs. This is useful when the only non-PK columns are collections +# and/or UDTs. #----------------------------------------------------------------------------------------------------------- spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name # Max TTL value of all non-PK columns will be used for insert on target -#spark.cdm.schema.origin.column.ttl.automatic true +spark.cdm.schema.origin.column.ttl.automatic true # Max TTL value of specified non-PK columns will be used for insert on target (overrides automatic setting) #spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,... # Max WRITETIME value of all non-PK columns will be used for insert on target -#spark.cdm.schema.origin.column.writetime.automatic true +spark.cdm.schema.origin.column.writetime.automatic true # Max WRITETIME value of specified non-PK columns will be used for insert on target (overrides automatic setting) #spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,... #spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,... +spark.cdm.schema.ttlwritetime.calc.useCollections false #=========================================================================================================== # Details about the Target Schema diff --git a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java index 9955b93d..8ad14f2b 100644 --- a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java +++ b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java @@ -410,6 +410,23 @@ public void getLargestTTLTest() { assertEquals(30, largestTTL); } + @Test + public void getLargestTTLWithListTest() { + when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS)).thenReturn(true); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); + when(originTable.indexOf("TTL("+ttlColumnName+")")).thenReturn(100); + when(originRow.getType(eq(100))).thenReturn(DataTypes.listOf(DataTypes.INT)); + when(originRow.getList(eq(100), eq(Integer.class))).thenReturn(Arrays.asList(Integer.valueOf(40), Integer.valueOf(10))); + when(originTable.indexOf("TTL("+writetimeTTLColumnName+")")).thenReturn(101); + when(originRow.getType(eq(101))).thenReturn(DataTypes.INT); + when(originRow.getInt(eq(101))).thenReturn(20); + + feature.loadProperties(propertyHelper); + feature.initializeAndValidate(originTable, targetTable); + Integer largestTTL = feature.getLargestTTL(originRow); + assertEquals(40, largestTTL); + } + @Test public void validateInvalidFilterMin() { when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(-1L); From b5ddaecea9863dc6758dcdce928bd9d6c1ce675a Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Fri, 18 Oct 2024 12:20:20 -0400 Subject: [PATCH 3/3] Fixed naming --- src/main/java/com/datastax/cdm/feature/WritetimeTTL.java | 2 +- .../java/com/datastax/cdm/properties/KnownProperties.java | 6 +++--- src/main/java/com/datastax/cdm/schema/CqlTable.java | 2 +- .../java/com/datastax/cdm/feature/WritetimeTTLTest.java | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index 8dd26134..98089f86 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -63,7 +63,7 @@ public boolean loadProperties(IPropertyHelper propertyHelper) { logger.info("PARAM -- WriteTimestampCols: {}", writetimeNames); this.autoWritetimeNames = false; } - allowCollectionsForWritetimeTTL = propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS); + allowCollectionsForWritetimeTTL = propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC); this.customWritetime = getCustomWritetime(propertyHelper); if (this.customWritetime > 0) { logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.TRANSFORM_CUSTOM_WRITETIME, customWritetime, diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index d883c967..91b332b0 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -78,7 +78,7 @@ public enum PropertyType { public static final String ORIGIN_TTL_NAMES = "spark.cdm.schema.origin.column.ttl.names"; public static final String ORIGIN_WRITETIME_AUTO = "spark.cdm.schema.origin.column.writetime.automatic"; public static final String ORIGIN_WRITETIME_NAMES = "spark.cdm.schema.origin.column.writetime.names"; - public static final String ALLOW_COLL_FOR_WRITETIME_TTL_COLS = "spark.cdm.schema.origin.column.ttlwritetime.allow.collections"; + public static final String ALLOW_COLL_FOR_WRITETIME_TTL_CALC = "spark.cdm.schema.ttlwritetime.calc.useCollections"; public static final String ORIGIN_COLUMN_NAMES_TO_TARGET = "spark.cdm.schema.origin.column.names.to.target"; @@ -91,8 +91,8 @@ public enum PropertyType { types.put(ORIGIN_WRITETIME_NAMES, PropertyType.STRING_LIST); types.put(ORIGIN_WRITETIME_AUTO, PropertyType.BOOLEAN); defaults.put(ORIGIN_WRITETIME_AUTO, "true"); - types.put(ALLOW_COLL_FOR_WRITETIME_TTL_COLS, PropertyType.BOOLEAN); - defaults.put(ALLOW_COLL_FOR_WRITETIME_TTL_COLS, "false"); + types.put(ALLOW_COLL_FOR_WRITETIME_TTL_CALC, PropertyType.BOOLEAN); + defaults.put(ALLOW_COLL_FOR_WRITETIME_TTL_CALC, "false"); types.put(ORIGIN_COLUMN_NAMES_TO_TARGET, PropertyType.STRING_LIST); } diff --git a/src/main/java/com/datastax/cdm/schema/CqlTable.java b/src/main/java/com/datastax/cdm/schema/CqlTable.java index 7c75770b..5c77bd33 100644 --- a/src/main/java/com/datastax/cdm/schema/CqlTable.java +++ b/src/main/java/com/datastax/cdm/schema/CqlTable.java @@ -471,7 +471,7 @@ private void setCqlMetadata(CqlSession cqlSession) { .collect(Collectors.toCollection(() -> this.cqlAllColumns)); boolean allowCollectionsForWritetimeTTL = propertyHelper - .getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS); + .getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC); this.writetimeTTLColumns = tableMetadata.getColumns().values().stream() .filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata, allowCollectionsForWritetimeTTL)) diff --git a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java index 8ad14f2b..6841dbde 100644 --- a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java +++ b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java @@ -367,7 +367,7 @@ public void getLargestWriteTimeStampTest() { @Test public void getLargestWriteTimeStampWithListTest() { - when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS)).thenReturn(true); + when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC)).thenReturn(true); when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L); when(originTable.indexOf("WRITETIME("+writetimeColumnName+")")).thenReturn(100); when(originRow.getType(eq(100))).thenReturn(DataTypes.listOf(DataTypes.BIGINT)); @@ -412,7 +412,7 @@ public void getLargestTTLTest() { @Test public void getLargestTTLWithListTest() { - when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_COLS)).thenReturn(true); + when(propertyHelper.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC)).thenReturn(true); when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); when(originTable.indexOf("TTL("+ttlColumnName+")")).thenReturn(100); when(originRow.getType(eq(100))).thenReturn(DataTypes.listOf(DataTypes.INT));