Skip to content

Commit

Permalink
[kudu] pick kudu connector to 452 version. (#232)
Browse files Browse the repository at this point in the history
Migrate the changes of trino Kudu connector from version 435 to version 452.

Changed some code to adapt to the changes of Trino Spi.

Performed tpch tests on both Trino and Doris.

It is recommended to compile with mvn package -Dmaven.test.skip=true.
  • Loading branch information
hubgeter authored Jul 23, 2024
1 parent 7fee875 commit 314ac78
Show file tree
Hide file tree
Showing 34 changed files with 1,007 additions and 734 deletions.
34 changes: 27 additions & 7 deletions plugin/trino-kudu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

<artifactId>trino-kudu</artifactId>
<packaging>trino-plugin</packaging>
<description>Trino - Kudu Connector</description>
<description>Trino - Kudu connector</description>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<kudu.version>1.15.0</kudu.version>
<air.compiler.fail-warnings>true</air.compiler.fail-warnings>
<kudu.version>1.17.0</kudu.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -131,12 +132,30 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>junit-extensions</artifactId>
Expand Down Expand Up @@ -218,20 +237,21 @@
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<groupId>org.rnorth.duct-tape</groupId>
<artifactId>duct-tape</artifactId>
<version>1.0.8</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.kudu;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
Expand All @@ -35,11 +34,11 @@
@DefunctConfig("kudu.client.default-socket-read-timeout")
public class KuduClientConfig
{
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
private static final Duration DEFAULT_OPERATION_TIMEOUT = new Duration(30, TimeUnit.SECONDS);

private List<String> masterAddresses = ImmutableList.of();
private Duration defaultAdminOperationTimeout = new Duration(30, TimeUnit.SECONDS);
private Duration defaultOperationTimeout = new Duration(30, TimeUnit.SECONDS);
private Duration defaultAdminOperationTimeout = DEFAULT_OPERATION_TIMEOUT;
private Duration defaultOperationTimeout = DEFAULT_OPERATION_TIMEOUT;
private boolean disableStatistics;
private boolean schemaEmulationEnabled;
private String schemaEmulationPrefix = "presto::";
Expand All @@ -54,15 +53,9 @@ public List<String> getMasterAddresses()
}

@Config("kudu.client.master-addresses")
public KuduClientConfig setMasterAddresses(String commaSeparatedList)
public KuduClientConfig setMasterAddresses(List<String> commaSeparatedList)
{
this.masterAddresses = SPLITTER.splitToList(commaSeparatedList);
return this;
}

public KuduClientConfig setMasterAddresses(String... contactPoints)
{
this.masterAddresses = ImmutableList.copyOf(contactPoints);
this.masterAddresses = commaSeparatedList;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.plugin.kudu.properties.ColumnDesign;
import io.trino.plugin.kudu.properties.HashPartitionDefinition;
import io.trino.plugin.kudu.properties.KuduTableProperties;
Expand Down Expand Up @@ -58,6 +60,7 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -178,7 +181,7 @@ public List<KuduSplit> buildKuduSplits(KuduTableHandle tableHandle, DynamicFilte
.boxed().collect(toList());
for (ColumnHandle column : desiredColumns.get()) {
KuduColumnHandle k = (KuduColumnHandle) column;
int index = k.getOrdinalPosition();
int index = k.ordinalPosition();
if (index >= primaryKeyColumnCount) {
columnIndexes.add(index);
}
Expand All @@ -194,7 +197,7 @@ public List<KuduSplit> buildKuduSplits(KuduTableHandle tableHandle, DynamicFilte
else {
if (desiredColumns.isPresent()) {
columnIndexes = desiredColumns.get().stream()
.map(handle -> ((KuduColumnHandle) handle).getOrdinalPosition())
.map(handle -> ((KuduColumnHandle) handle).ordinalPosition())
.collect(toImmutableList());
}
else {
Expand Down Expand Up @@ -323,12 +326,12 @@ public void addColumn(SchemaTableName schemaTableName, ColumnMetadata column)
String rawName = schemaEmulation.toRawName(schemaTableName);
AlterTableOptions alterOptions = new AlterTableOptions();
Type type = TypeHelper.toKuduClientType(column.getType());
alterOptions.addColumn(
new ColumnSchemaBuilder(column.getName(), type)
.nullable(true)
.defaultValue(null)
.comment(nullToEmpty(column.getComment())) // Kudu doesn't allow null comment
.build());
ColumnSchemaBuilder builder = new ColumnSchemaBuilder(column.getName(), type)
.nullable(true)
.defaultValue(null)
.comment(nullToEmpty(column.getComment())); // Kudu doesn't allow null comment
setTypeAttributes(column, builder);
alterOptions.addColumn(builder.build());
client.alterTable(rawName, alterOptions);
}
catch (KuduException e) {
Expand Down Expand Up @@ -384,8 +387,8 @@ private void changeRangePartition(SchemaTableName schemaTableName, RangePartitio
if (definition == null) {
throw new TrinoException(QUERY_REJECTED, "Table " + schemaTableName + " has no range partition");
}
PartialRow lowerBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.getLower());
PartialRow upperBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.getUpper());
PartialRow lowerBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.lower());
PartialRow upperBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.upper());
AlterTableOptions alterOptions = new AlterTableOptions();
switch (change) {
case ADD:
Expand Down Expand Up @@ -467,19 +470,19 @@ private CreateTableOptions buildCreateTableOptions(Schema schema, Map<String, Ob
PartitionDesign partitionDesign = KuduTableProperties.getPartitionDesign(properties);
if (partitionDesign.getHash() != null) {
for (HashPartitionDefinition partition : partitionDesign.getHash()) {
options.addHashPartitions(partition.getColumns(), partition.getBuckets());
options.addHashPartitions(partition.columns(), partition.buckets());
}
}
if (partitionDesign.getRange() != null) {
rangePartitionDefinition = partitionDesign.getRange();
options.setRangePartitionColumns(rangePartitionDefinition.getColumns());
options.setRangePartitionColumns(rangePartitionDefinition.columns());
}

List<RangePartition> rangePartitions = KuduTableProperties.getRangePartitions(properties);
if (rangePartitionDefinition != null && !rangePartitions.isEmpty()) {
for (RangePartition rangePartition : rangePartitions) {
PartialRow lower = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.getLower());
PartialRow upper = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.getUpper());
PartialRow lower = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.lower());
PartialRow upper = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.upper());
options.addRangePartition(lower, upper);
}
}
Expand All @@ -503,7 +506,7 @@ private void addConstraintPredicates(KuduTable table, KuduScanToken.KuduScanToke

Schema schema = table.getSchema();
constraintSummary.getDomains().orElseThrow().forEach((columnHandle, domain) -> {
int position = ((KuduColumnHandle) columnHandle).getOrdinalPosition();
int position = ((KuduColumnHandle) columnHandle).ordinalPosition();
ColumnSchema columnSchema = schema.getColumnByIndex(position);
verify(!domain.isNone(), "Domain is none");
if (domain.isAll()) {
Expand All @@ -529,8 +532,8 @@ else if (domain.isSingleValue()) {
KuduPredicate predicate = createInListPredicate(columnSchema, discreteValues);
builder.addPredicate(predicate);
}
else if (valueSet instanceof SortedRangeSet) {
Ranges ranges = ((SortedRangeSet) valueSet).getRanges();
else if (valueSet instanceof SortedRangeSet sortedRangeSet) {
Ranges ranges = sortedRangeSet.getRanges();
List<Range> rangeList = ranges.getOrderedRanges();
if (rangeList.stream().allMatch(Range::isSingleValue)) {
io.trino.spi.type.Type type = TypeHelper.fromKuduColumn(columnSchema);
Expand Down Expand Up @@ -577,35 +580,39 @@ private KuduPredicate createComparisonPredicate(ColumnSchema columnSchema, KuduP
{
io.trino.spi.type.Type type = TypeHelper.fromKuduColumn(columnSchema);
Object javaValue = TypeHelper.getJavaValue(type, value);
if (javaValue instanceof Long) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (Long) javaValue);
if (javaValue instanceof Long longValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, longValue);
}
if (javaValue instanceof BigDecimal) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (BigDecimal) javaValue);
if (javaValue instanceof BigDecimal bigDecimal) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, bigDecimal);
}
if (javaValue instanceof Integer) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (Integer) javaValue);
if (javaValue instanceof Integer integerValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, integerValue);
}
if (javaValue instanceof Short) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (Short) javaValue);
if (javaValue instanceof Short shortValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, shortValue);
}
if (javaValue instanceof Byte) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (Byte) javaValue);
if (javaValue instanceof Byte byteValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, byteValue);
}
if (javaValue instanceof String) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (String) javaValue);
if (javaValue instanceof String stringValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, stringValue);
}
if (javaValue instanceof Double) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (Double) javaValue);
if (javaValue instanceof Double doubleValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, doubleValue);
}
if (javaValue instanceof Float) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (Float) javaValue);
if (javaValue instanceof Float floatValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, floatValue);
}
if (javaValue instanceof Boolean) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (Boolean) javaValue);
if (javaValue instanceof Boolean booleanValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, booleanValue);
}
if (javaValue instanceof byte[]) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, (byte[]) javaValue);
if (javaValue instanceof byte[] byteArrayValue) {
return KuduPredicate.newComparisonPredicate(columnSchema, op, byteArrayValue);
}
if (javaValue instanceof ByteBuffer byteBuffer) {
Slice slice = Slices.wrappedHeapBuffer(byteBuffer);
return KuduPredicate.newComparisonPredicate(columnSchema, op, slice.getBytes(0, slice.length()));
}
if (javaValue == null) {
throw new IllegalStateException("Unexpected null java value for column " + columnSchema.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,28 @@
*/
package io.trino.plugin.kudu;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class KuduColumnHandle
public record KuduColumnHandle(String name, int ordinalPosition, Type type)
implements ColumnHandle
{
public static final String ROW_ID = "row_uuid";
public static final int ROW_ID_POSITION = -1;

public static final KuduColumnHandle ROW_ID_HANDLE = new KuduColumnHandle(ROW_ID, ROW_ID_POSITION, VarbinaryType.VARBINARY);

private final String name;
private final int ordinalPosition;
private final Type type;

@JsonCreator
public KuduColumnHandle(
@JsonProperty("name") String name,
@JsonProperty("ordinalPosition") int ordinalPosition,
@JsonProperty("type") Type type)
{
this.name = requireNonNull(name, "name is null");
this.ordinalPosition = ordinalPosition;
this.type = requireNonNull(type, "type is null");
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public int getOrdinalPosition()
{
return ordinalPosition;
}

@JsonProperty
public Type getType()
public KuduColumnHandle
{
return type;
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");
}

public ColumnMetadata getColumnMetadata()
public ColumnMetadata columnMetadata()
{
return new ColumnMetadata(name, type);
}
Expand All @@ -75,38 +43,4 @@ public boolean isVirtualRowId()
{
return name.equals(ROW_ID);
}

@Override
public int hashCode()
{
return Objects.hash(
name,
ordinalPosition,
type);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
KuduColumnHandle other = (KuduColumnHandle) obj;
return Objects.equals(this.name, other.name) &&
Objects.equals(this.ordinalPosition, other.ordinalPosition) &&
Objects.equals(this.type, other.type);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("name", name)
.add("ordinalPosition", ordinalPosition)
.add("type", type)
.toString();
}
}
Loading

0 comments on commit 314ac78

Please sign in to comment.