Skip to content

Commit

Permalink
Add support for Iceberg table identifiers with special characters (#3…
Browse files Browse the repository at this point in the history
…3293)

* Add support for Iceberg table identifiers with special characters

* Add used undeclared dependencies

* Fix style

* Trigger iceberg integration tests
  • Loading branch information
regadas authored Jan 6, 2025
1 parent bbccf52 commit d6e0b0c
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
4 changes: 3 additions & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ dependencies {
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
implementation library.java.hadoop_common
implementation library.java.jackson_core
implementation library.java.jackson_databind

testImplementation project(":sdks:java:managed")
testImplementation library.java.hadoop_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
Expand Down Expand Up @@ -134,7 +133,7 @@ public void processElement(
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey()));

// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition specs, we will group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

@AutoValue
Expand All @@ -41,7 +42,7 @@ abstract class FileWriteResult {
@SchemaIgnore
public TableIdentifier getTableIdentifier() {
if (cachedTableIdentifier == null) {
cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString());
cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString());
}
return cachedTableIdentifier;
}
Expand All @@ -67,7 +68,7 @@ abstract static class Builder {

@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
return setTableIdentifierString(tableId.toString());
return setTableIdentifierString(TableIdentifierParser.toJson(tableId));
}

public abstract FileWriteResult build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.catalog.TableIdentifier;

/**
* SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and
Expand Down Expand Up @@ -86,7 +85,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.getPipeline()
.apply(
IcebergIO.readRows(configuration.getIcebergCatalog())
.from(TableIdentifier.parse(configuration.getTable())));
.from(IcebergUtils.parseTableIdentifier(configuration.getTable())));

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.expressions.Expression;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -51,7 +52,9 @@ public enum ScanType {
public Table getTable() {
if (cachedTable == null) {
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
getCatalogConfig()
.catalog()
.loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier()));
}
return cachedTable;
}
Expand Down Expand Up @@ -126,7 +129,7 @@ public abstract static class Builder {
public abstract Builder setTableIdentifier(String tableIdentifier);

public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
return this.setTableIdentifier(tableIdentifier.toString());
return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier));
}

public Builder setTableIdentifier(String... names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -36,6 +39,8 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
Expand All @@ -47,6 +52,9 @@

/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
public class IcebergUtils {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private IcebergUtils() {}

private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
Expand Down Expand Up @@ -506,4 +514,13 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}

public static TableIdentifier parseTableIdentifier(String table) {
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(table);
return TableIdentifierParser.fromJson(jsonNode);
} catch (JsonProcessingException e) {
return TableIdentifier.parse(table);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable
@VisibleForTesting
TableIdentifier getTableIdentifier() {
if (tableId == null) {
tableId = TableIdentifier.parse(checkStateNotNull(tableIdString));
tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString));
}
return tableId;
}
Expand Down Expand Up @@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException {
@Override
public void readExternal(ObjectInput in) throws IOException {
tableIdString = in.readUTF();
tableId = TableIdentifier.parse(tableIdString);
tableId = IcebergUtils.parseTableIdentifier(tableIdString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

class PortableIcebergDestinations implements DynamicDestinations {
Expand Down Expand Up @@ -73,7 +72,7 @@ public String getTableStringIdentifier(ValueInSingleWindow<Row> element) {
@Override
public IcebergDestination instantiateDestination(String dest) {
return IcebergDestination.builder()
.setTableIdentifier(TableIdentifier.parse(dest))
.setTableIdentifier(IcebergUtils.parseTableIdentifier(dest))
.setTableCreateConfig(null)
.setFileFormat(FileFormat.fromString(fileFormat))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -46,11 +48,11 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
@RunWith(Parameterized.class)
public class IcebergIOReadTest {

private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class);
Expand All @@ -61,6 +63,21 @@ public class IcebergIOReadTest {

@Rule public TestPipeline testPipeline = TestPipeline.create();

@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}", tableId())},
{String.format("default.%s", tableId())},
});
}

public static String tableId() {
return "table" + Long.toString(UUID.randomUUID().hashCode(), 16);
}

@Parameterized.Parameter public String tableStringIdentifier;

static class PrintRow extends DoFn<Row, Row> {

@ProcessElement
Expand All @@ -72,8 +89,7 @@ public void process(@Element Row row, OutputReceiver<Row> output) throws Excepti

@Test
public void testSimpleScan() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
TableIdentifier tableId = IcebergUtils.parseTableIdentifier(tableStringIdentifier);
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.parseTableIdentifier;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.math.BigDecimal;
Expand All @@ -32,6 +34,7 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -49,6 +52,7 @@
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;

/** Test class for {@link IcebergUtils}. */
@RunWith(Enclosed.class)
Expand Down Expand Up @@ -802,4 +806,40 @@ public void testStructIcebergSchemaToBeamSchema() {
assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
}
}

@RunWith(Parameterized.class)
public static class TableIdentifierParseTests {

@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{
"{\"namespace\": [\"dogs\", \"owners.and.handlers\"], \"name\": \"food\"}",
"dogs.owners.and.handlers.food",
true
},
{"dogs.owners.and.handlers.food", "dogs.owners.and.handlers.food", true},
{"{\"name\": \"food\"}", "food", true},
{"{\"table_name\": \"food\"}", "{\"table_name\": \"food\"}", false},
});
}

@Parameterized.Parameter public String input;

@Parameterized.Parameter(1)
public String expected;

@Parameterized.Parameter(2)
public boolean shouldSucceed;

@Test
public void test() {
if (shouldSucceed) {
assertEquals(expected, parseTableIdentifier(input).toString());
} else {
assertThrows(IllegalArgumentException.class, () -> parseTableIdentifier(input));
}
}
}
}

0 comments on commit d6e0b0c

Please sign in to comment.