Skip to content

Commit

Permalink
Allow updating partition tables
Browse files Browse the repository at this point in the history
Still a few bugs but can get a simple query on a partitioned table
working.

Given a table created with

CREATE TABLE t (
    a int,
    b int
 )
 WITH ( partitioning = ARRAY['a'] )

Query patterns that work:

- UPDATE t SET a = a + 1
- UPDATE t SET a = a + 1 WHERE [a, b] [>, <, =, !=] 1
- UPDATE t SET b = 1, a = a [WHERE ... ]

Patterns that don't work:

- UPDATE t set a = 1
- UPDATE t set b = 1
- multiple partition columns
  • Loading branch information
ZacBlanco committed Dec 31, 2024
1 parent 19ee4d9 commit 4aa34a0
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 209 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.spi.ConnectorPageSource;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class ConnectorPageSourceWithRowPositions
{
private final ConnectorPageSource delegate;
private final Optional<Long> startRowPosition;
private final Optional<Long> endRowPosition;

public ConnectorPageSourceWithRowPositions(
ConnectorPageSource delegate,
Optional<Long> startRowPosition,
Optional<Long> endRowPosition)
{
this.delegate = requireNonNull(delegate, "connectorPageSource is null");
this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null");
this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null");
}

public ConnectorPageSource getDelegate()
{
return delegate;
}

public Optional<Long> getStartRowPosition()
{
return startRowPosition;
}

public Optional<Long> getEndRowPosition()
{
return endRowPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@

import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveUtil.PRESTO_QUERY_ID;
import static com.facebook.presto.hive.MetadataUtils.getCombinedRemainingPredicate;
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
Expand All @@ -123,12 +124,11 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PRESTO_UPDATE_ROW_ID_COLUMN_ID;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PRESTO_UPDATE_ROW_ID_COLUMN_NAME;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBEG_COMMIT_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA;
import static com.facebook.presto.iceberg.IcebergPartitionType.ALL;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
Expand Down Expand Up @@ -1162,11 +1162,8 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect
unmodifiedColumns.add(column);
}
}
Types.NestedField field = Types.NestedField.required(PRESTO_UPDATE_ROW_ID_COLUMN_ID, PRESTO_UPDATE_ROW_ID_COLUMN_NAME, Types.StructType.of(unmodifiedColumns));
return new IcebergColumnHandle(ColumnIdentity.createColumnIdentity(field),
toPrestoType(field.type(), typeManager),
Optional.ofNullable(field.doc()),
REGULAR);
Types.NestedField field = Types.NestedField.required(UPDATE_ROW_DATA.getId(), UPDATE_ROW_DATA.getColumnName(), Types.StructType.of(unmodifiedColumns));
return IcebergColumnHandle.create(field, typeManager, SYNTHESIZED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static com.facebook.presto.iceberg.ColumnIdentity.primitiveColumnIdentity;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand All @@ -50,9 +51,8 @@ public class IcebergColumnHandle
public static final IcebergColumnHandle DATA_SEQUENCE_NUMBER_COLUMN_HANDLE = getIcebergColumnHandle(DATA_SEQUENCE_NUMBER);
public static final ColumnMetadata DATA_SEQUENCE_NUMBER_COLUMN_METADATA = getColumnMetadata(DATA_SEQUENCE_NUMBER);

// Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts.
public static final int PRESTO_UPDATE_ROW_ID_COLUMN_ID = Integer.MIN_VALUE;
public static final String PRESTO_UPDATE_ROW_ID_COLUMN_NAME = "$row_id";
// public static final int PRESTO_UPDATE_ROW_ID_COLUMN_ID = Integer.MIN_VALUE;
// public static final String PRESTO_UPDATE_ROW_ID_COLUMN_NAME = "$row_id";
private final ColumnIdentity columnIdentity;
private final Type type;

Expand Down Expand Up @@ -102,7 +102,7 @@ public boolean isRowPositionColumn()
@JsonIgnore
public boolean isUpdateRowIdColumn()
{
return getId() == PRESTO_UPDATE_ROW_ID_COLUMN_ID;
return columnIdentity.getId() == UPDATE_ROW_DATA.getId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,30 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.Type;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.MetadataColumns;

import java.util.Set;
import java.util.stream.Stream;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.UnknownType.UNKNOWN;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE;
import static com.facebook.presto.iceberg.ColumnIdentity.TypeCategory.STRUCT;
import static com.google.common.collect.ImmutableSet.toImmutableSet;

public enum IcebergMetadataColumn
{
FILE_PATH(MetadataColumns.FILE_PATH.fieldId(), "$path", VARCHAR, PRIMITIVE),
DATA_SEQUENCE_NUMBER(Integer.MAX_VALUE - 1001, "$data_sequence_number", BIGINT, PRIMITIVE),
/**
* Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts.
* Inner type for row is not known until runtime.
*/
UPDATE_ROW_DATA(Integer.MIN_VALUE, "$row_id", RowType.anonymous(ImmutableList.of(UNKNOWN)), STRUCT)
/**/;

private static final Set<Integer> COLUMN_IDS = Stream.of(values())
Expand Down
Loading

0 comments on commit 4aa34a0

Please sign in to comment.