From 1d7772725916132802d413bbe0e093c39db88755 Mon Sep 17 00:00:00 2001 From: libailin Date: Wed, 16 Oct 2024 16:39:44 +0800 Subject: [PATCH] [Feature-#1924][opengauss] Add opengauss connector --- .../chunjun-connector-opengauss/pom.xml | 62 ++++ .../converter/OpengaussRawTypeMapper.java | 206 +++++++++++++ .../converter/OpengaussSyncConverter.java | 274 ++++++++++++++++++ .../opengauss/converter/logical/BitType.java | 78 +++++ .../opengauss/converter/logical/BoolType.java | 79 +++++ .../converter/logical/BpcharType.java | 80 +++++ .../converter/logical/ByteaType.java | 79 +++++ .../opengauss/converter/logical/CharType.java | 79 +++++ .../opengauss/converter/logical/DateType.java | 80 +++++ .../converter/logical/Float4Type.java | 79 +++++ .../converter/logical/Float8Type.java | 79 +++++ .../opengauss/converter/logical/Int2Type.java | 79 +++++ .../opengauss/converter/logical/Int4Type.java | 79 +++++ .../opengauss/converter/logical/Int8Type.java | 79 +++++ .../opengauss/converter/logical/JsonType.java | 80 +++++ .../converter/logical/JsonbType.java | 80 +++++ .../converter/logical/MoneyType.java | 79 +++++ .../opengauss/converter/logical/NameType.java | 80 +++++ .../converter/logical/NumericType.java | 81 ++++++ .../opengauss/converter/logical/OidType.java | 79 +++++ .../converter/logical/PgCustomType.java | 43 +++ .../converter/logical/PointType.java | 80 +++++ .../opengauss/converter/logical/TextType.java | 80 +++++ .../opengauss/converter/logical/TimeType.java | 80 +++++ .../converter/logical/TimestampType.java | 81 ++++++ .../converter/logical/TimestampTzType.java | 81 ++++++ .../converter/logical/TimetzType.java | 80 +++++ .../opengauss/converter/logical/UuidType.java | 79 +++++ .../converter/logical/VarbitType.java | 79 +++++ .../converter/logical/VarcharType.java | 80 +++++ .../opengauss/converter/logical/XmlType.java | 79 +++++ .../opengauss/dialect/OpengaussDialect.java | 163 +++++++++++ .../opengauss/sink/OpengaussOutputFormat.java | 196 +++++++++++++ .../opengauss/sink/OpengaussSinkFactory.java | 41 +++ .../source/OpengaussInputFormat.java | 97 +++++++ .../source/OpengaussSourceFactory.java | 32 ++ .../table/OpengaussDynamicTableFactory.java | 38 +++ .../org.apache.flink.table.factories.Factory | 16 + chunjun-connectors/pom.xml | 1 + 39 files changed, 3317 insertions(+) create mode 100644 chunjun-connectors/chunjun-connector-opengauss/pom.xml create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussRawTypeMapper.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussSyncConverter.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BitType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BoolType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BpcharType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/ByteaType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/CharType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/DateType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float4Type.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float8Type.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int2Type.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int4Type.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int8Type.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonbType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/MoneyType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NameType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NumericType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/OidType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PgCustomType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PointType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TextType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimeType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampTzType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimetzType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/UuidType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarbitType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarcharType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/XmlType.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/dialect/OpengaussDialect.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussOutputFormat.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussSinkFactory.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussInputFormat.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussSourceFactory.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/table/OpengaussDynamicTableFactory.java create mode 100644 chunjun-connectors/chunjun-connector-opengauss/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/chunjun-connectors/chunjun-connector-opengauss/pom.xml b/chunjun-connectors/chunjun-connector-opengauss/pom.xml new file mode 100644 index 0000000000..d110e479bd --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/pom.xml @@ -0,0 +1,62 @@ + + + + + + chunjun-connectors + com.dtstack.chunjun + ${revision} + + 4.0.0 + + chunjun-connector-opengauss + ChunJun : Connector : openGauss + + + opengauss + + + + + com.dtstack.chunjun + chunjun-connector-jdbc-base + ${project.version} + + + + + + org.opengauss + opengauss-jdbc + 5.0.1 + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + + diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussRawTypeMapper.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussRawTypeMapper.java new file mode 100644 index 0000000000..87ad6664e1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussRawTypeMapper.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter; + +import com.dtstack.chunjun.config.TypeConfig; +import com.dtstack.chunjun.connector.opengauss.converter.logical.BitType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.BoolType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.BpcharType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.ByteaType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.CharType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.DateType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.Float4Type; +import com.dtstack.chunjun.connector.opengauss.converter.logical.Float8Type; +import com.dtstack.chunjun.connector.opengauss.converter.logical.Int2Type; +import com.dtstack.chunjun.connector.opengauss.converter.logical.Int4Type; +import com.dtstack.chunjun.connector.opengauss.converter.logical.Int8Type; +import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonbType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.MoneyType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.NameType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.NumericType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.OidType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.PointType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.TextType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.TimeType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.TimestampType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.TimestampTzType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.TimetzType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.UuidType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.VarbitType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.VarcharType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.XmlType; +import com.dtstack.chunjun.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +public class OpengaussRawTypeMapper { + + /** + * https://docs-opengauss.osinfra.cn/zh/docs/5.0.0/docs/BriefTutorial/%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B.html + * + * @param type + */ + public static DataType apply(TypeConfig type) { + switch (type.getType()) { + case "BIT": + return new AtomicDataType(new BitType(true, LogicalTypeRoot.BOOLEAN, false)); + case "BOOLEAN": + case "BOOL": + return DataTypes.BOOLEAN(); + case "SMALLINT": + case "SMALLSERIAL": + case "INT2": + case "INT": + case "INTEGER": + case "SERIAL": + case "INT4": + return DataTypes.INT(); + case "BIGINT": + case "BIGSERIAL": + case "OID": + case "INT8": + return DataTypes.BIGINT(); + case "REAL": + case "FLOAT4": + return DataTypes.FLOAT(); + case "FLOAT": + case "DOUBLE PRECISION": + case "FLOAT8": + return DataTypes.DOUBLE(); + case "MONEY": + return new AtomicDataType(new MoneyType(true, LogicalTypeRoot.DOUBLE, false)); + case "DECIMAL": + case "NUMERIC": + return type.toDecimalDataType(); + case "CHARACTER VARYING": + case "VARCHAR": + case "CHARACTER": + case "CHAR": + case "TEXT": + case "NAME": + case "BPCHAR": + return DataTypes.STRING(); + // Binary Data Types + case "BYTEA": + return DataTypes.BYTES(); + case "VARBIT": + return new AtomicDataType(new VarbitType(true, LogicalTypeRoot.VARCHAR, false)); + case "XML": + return new AtomicDataType(new XmlType(true, LogicalTypeRoot.VARCHAR, false)); + case "UUID": + return new AtomicDataType(new UuidType(true, LogicalTypeRoot.VARCHAR, false)); + case "POINT": + return new AtomicDataType(new PointType(true, LogicalTypeRoot.VARCHAR, false)); + // Date/Time Types + case "ABSTIME": + case "TIMESTAMP": + case "TIMESTAMPTZ": + return type.toTimestampDataType(6); + case "DATE": + return DataTypes.DATE(); + case "TIME": + case "TIMETZ": + // todo check sync + return type.toTimeDataType(6); + case "JSON": + return new AtomicDataType(new JsonType(true, LogicalTypeRoot.VARCHAR, false)); + case "JSONB": + return new AtomicDataType(new JsonbType(true, LogicalTypeRoot.VARCHAR, false)); + case "_BIT": + return new AtomicDataType(new BitType(true, LogicalTypeRoot.VARCHAR, true)); + case "_BOOL": + return new AtomicDataType(new BoolType(true, LogicalTypeRoot.VARCHAR, true)); + case "_INT2": + return new AtomicDataType(new Int2Type(true, LogicalTypeRoot.VARCHAR, true)); + case "_INT4": + return new AtomicDataType(new Int4Type(true, LogicalTypeRoot.VARCHAR, true)); + case "_INT8": + return new AtomicDataType(new Int8Type(true, LogicalTypeRoot.VARCHAR, true)); + case "_FLOAT4": + return new AtomicDataType(new Float4Type(true, LogicalTypeRoot.VARCHAR, true)); + case "_FLOAT8": + return new AtomicDataType(new Float8Type(true, LogicalTypeRoot.VARCHAR, true)); + case "_NUMERIC": + return new AtomicDataType(new NumericType(true, LogicalTypeRoot.VARCHAR, true)); + case "_TIME": + return new AtomicDataType(new TimeType(true, LogicalTypeRoot.VARCHAR, true)); + case "_TIMETZ": + return new AtomicDataType(new TimetzType(true, LogicalTypeRoot.VARCHAR, true)); + case "_TIMESTAMP": + return new AtomicDataType(new TimestampType(true, LogicalTypeRoot.VARCHAR, true)); + case "_TIMESTAMPTZ": + return new AtomicDataType(new TimestampTzType(true, LogicalTypeRoot.VARCHAR, true)); + case "_DATE": + return new AtomicDataType(new DateType(true, LogicalTypeRoot.VARCHAR, true)); + case "_BYTEA": + return new AtomicDataType(new ByteaType(true, LogicalTypeRoot.VARCHAR, true)); + case "_VARCHAR": + return new AtomicDataType(new VarcharType(true, LogicalTypeRoot.VARCHAR, true)); + case "_OID": + return new AtomicDataType(new OidType(true, LogicalTypeRoot.VARCHAR, true)); + case "_BPCHAR": + return new AtomicDataType(new BpcharType(true, LogicalTypeRoot.VARCHAR, true)); + case "_TEXT": + return new AtomicDataType(new TextType(true, LogicalTypeRoot.VARCHAR, true)); + case "_MONEY": + return new AtomicDataType(new MoneyType(true, LogicalTypeRoot.VARCHAR, true)); + // case "_INTERVAL": + case "_CHAR": + return new AtomicDataType(new CharType(true, LogicalTypeRoot.VARCHAR, true)); + case "_VARBIT": + return new AtomicDataType(new VarbitType(true, LogicalTypeRoot.VARCHAR, true)); + case "_NAME": + return new AtomicDataType(new NameType(true, LogicalTypeRoot.VARCHAR, true)); + case "_UUID": + return new AtomicDataType(new UuidType(true, LogicalTypeRoot.VARCHAR, true)); + case "_XML": + return new AtomicDataType(new XmlType(true, LogicalTypeRoot.VARCHAR, true)); + // case "_POINT": + // case "_JSONB": + // case "_JSON": + // case "_REF_CURSOR": + + // 以下类型无法支持 + // Enumerated Types + + // Geometric Types + // case "LINE": + // case "LSEG": + // case "BOX": + // case "PATH": + // case "POLYGON": + // case "CIRCLE": + + // Network Address Types + + // + // // JSON Types + // case "JSONB": + // case "JSONPATH": + // return DataTypes.STRING(); + + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussSyncConverter.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussSyncConverter.java new file mode 100644 index 0000000000..fad14a7c46 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/OpengaussSyncConverter.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter; + +import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.jdbc.converter.JdbcSyncConverter; +import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.chunjun.connector.opengauss.converter.logical.BitType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.JsonbType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.MoneyType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.PgCustomType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.PointType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.UuidType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.VarbitType; +import com.dtstack.chunjun.connector.opengauss.converter.logical.XmlType; +import com.dtstack.chunjun.converter.IDeserializationConverter; +import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.element.column.BigDecimalColumn; +import com.dtstack.chunjun.element.column.BooleanColumn; +import com.dtstack.chunjun.element.column.BytesColumn; +import com.dtstack.chunjun.element.column.DoubleColumn; +import com.dtstack.chunjun.element.column.FloatColumn; +import com.dtstack.chunjun.element.column.IntColumn; +import com.dtstack.chunjun.element.column.LongColumn; +import com.dtstack.chunjun.element.column.SqlDateColumn; +import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimeColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; +import com.dtstack.chunjun.element.column.YearMonthColumn; +import com.dtstack.chunjun.util.DateUtil; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; + +import org.opengauss.core.BaseConnection; +import org.opengauss.jdbc.PgArray; +import org.opengauss.jdbc.PgSQLXML; +import org.opengauss.util.PGobject; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.UUID; + +public class OpengaussSyncConverter extends JdbcSyncConverter { + + private static final long serialVersionUID = 7381732546960782333L; + + private transient BaseConnection connection; + + public OpengaussSyncConverter(RowType rowType, CommonConfig commonConfig) { + super(rowType, commonConfig); + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return val -> new BooleanColumn((Boolean) val); + case INTEGER: + return val -> new IntColumn((Integer) val); + case INTERVAL_YEAR_MONTH: + return (IDeserializationConverter) + val -> { + YearMonthIntervalType yearMonthIntervalType = + (YearMonthIntervalType) type; + switch (yearMonthIntervalType.getResolution()) { + case YEAR: + return new YearMonthColumn( + Integer.parseInt(String.valueOf(val).substring(0, 4))); + case MONTH: + case YEAR_TO_MONTH: + default: + throw new UnsupportedOperationException( + "jdbc converter only support YEAR"); + } + }; + case FLOAT: + return val -> new FloatColumn((Float) val); + case DOUBLE: + return val -> new DoubleColumn((Double) val); + case BIGINT: + return val -> new LongColumn((Long) val); + case DECIMAL: + return val -> new BigDecimalColumn((BigDecimal) val); + case CHAR: + case VARCHAR: + if (type instanceof PgCustomType && ((PgCustomType) type).isArray()) { + return val -> new StringColumn(val.toString()); + } else if (type instanceof JsonType + || type instanceof JsonbType + || type instanceof VarbitType + || type instanceof PointType) { + return val -> new StringColumn(((PGobject) val).getValue()); + } else if (type instanceof UuidType) { + return val -> new StringColumn(((UUID) val).toString()); + } else if (type instanceof XmlType) { + return val -> new StringColumn(((PgSQLXML) val).getString()); + } + return val -> new StringColumn(val.toString()); + case DATE: + return val -> new SqlDateColumn((Date) val); + case TIME_WITHOUT_TIME_ZONE: + return val -> new TimeColumn((Time) val); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (IDeserializationConverter) + val -> { + if (val instanceof PGobject) { + return new TimestampColumn( + DateUtil.convertToTimestampWithZone(val.toString()), 0); + } + + return new TimestampColumn( + (Timestamp) val, ((TimestampType) (type)).getPrecision()); + }; + case BINARY: + case VARBINARY: + return val -> new BytesColumn((byte[]) val); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + @Override + protected ISerializationConverter createExternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + PGobject pgObject = new PGobject(); + pgObject.setType("bit"); + if (type instanceof BitType) { + return (val, index, statement) -> { + pgObject.setValue( + ((ColumnRowData) val).getField(index).asBoolean() ? "1" : "0"); + statement.setObject(index, pgObject); + }; + } + return (val, index, statement) -> + statement.setBoolean( + index, ((ColumnRowData) val).getField(index).asBoolean()); + case INTEGER: + return (val, index, statement) -> + statement.setInt(index, ((ColumnRowData) val).getField(index).asInt()); + case FLOAT: + return (val, index, statement) -> + statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat()); + case DOUBLE: + if (type instanceof MoneyType) { + PGobject pGobject = new PGobject(); + pGobject.setType("money"); + return (val, index, statement) -> { + pGobject.setValue(((ColumnRowData) val).getField(index).asString()); + statement.setObject(index, pGobject); + }; + } + return (val, index, statement) -> + statement.setDouble( + index, ((ColumnRowData) val).getField(index).asDouble()); + case BIGINT: + return (val, index, statement) -> + statement.setLong(index, ((ColumnRowData) val).getField(index).asLong()); + case DECIMAL: + return (val, index, statement) -> + statement.setBigDecimal( + index, ((ColumnRowData) val).getField(index).asBigDecimal()); + case CHAR: + case VARCHAR: + if (type instanceof PgCustomType && ((PgCustomType) type).isArray()) { + final int oid = ((PgCustomType) type).getArrayOid(); + return (val, index, statement) -> + statement.setArray( + index, + new PgArray( + connection, + oid, + (String) + ((ColumnRowData) val) + .getField(index) + .getData())); + } else if (type instanceof JsonType) { + PGobject jsonObject = new PGobject(); + jsonObject.setType("json"); + return (val, index, statement) -> { + jsonObject.setValue(((ColumnRowData) val).getField(index).asString()); + statement.setObject(index, jsonObject); + }; + } else if (type instanceof JsonbType) { + PGobject jsonObject = new PGobject(); + jsonObject.setType("jsonb"); + return (val, index, statement) -> { + jsonObject.setValue(((ColumnRowData) val).getField(index).asString()); + statement.setObject(index, jsonObject); + }; + } else if (type instanceof VarbitType) { + PGobject jsonObject = new PGobject(); + jsonObject.setType("varbit"); + return (val, index, statement) -> { + jsonObject.setValue(((ColumnRowData) val).getField(index).asString()); + statement.setObject(index, jsonObject); + }; + } else if (type instanceof XmlType) { + PGobject jsonObject = new PGobject(); + jsonObject.setType("xml"); + return (val, index, statement) -> { + jsonObject.setValue(((ColumnRowData) val).getField(index).asString()); + statement.setObject(index, jsonObject); + }; + } else if (type instanceof UuidType) { + PGobject jsonObject = new PGobject(); + jsonObject.setType("uuid"); + return (val, index, statement) -> { + jsonObject.setValue(((ColumnRowData) val).getField(index).asString()); + statement.setObject(index, jsonObject); + }; + } else if (type instanceof PointType) { + PGobject jsonObject = new PGobject(); + jsonObject.setType("point"); + return (val, index, statement) -> { + jsonObject.setValue(((ColumnRowData) val).getField(index).asString()); + statement.setObject(index, jsonObject); + }; + } + return (val, index, statement) -> + statement.setString( + index, ((ColumnRowData) val).getField(index).asString()); + case DATE: + return (val, index, statement) -> + statement.setDate(index, ((ColumnRowData) val).getField(index).asSqlDate()); + case TIME_WITHOUT_TIME_ZONE: + return (val, index, statement) -> + statement.setTime(index, ((ColumnRowData) val).getField(index).asTime()); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (val, index, statement) -> + statement.setTimestamp( + index, ((ColumnRowData) val).getField(index).asTimestamp()); + + case BINARY: + case VARBINARY: + return (val, index, statement) -> + statement.setBytes(index, ((ColumnRowData) val).getField(index).asBytes()); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + public void setConnection(BaseConnection connection) { + this.connection = connection; + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BitType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BitType.java new file mode 100644 index 0000000000..2c2b1e915c --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BitType.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class BitType extends PgCustomType { + private static final Set INPUT_CONVERSION = + conversionSet(Boolean.class.getName(), Boolean.class.getName()); + + private static final Class DEFAULT_CONVERSION = Boolean.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public BitType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.BIT_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-BIT"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new BitType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BoolType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BoolType.java new file mode 100644 index 0000000000..bc1219aa09 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BoolType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class BoolType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Boolean.class.getName(), Boolean.class.getName()); + + private static final Class DEFAULT_CONVERSION = Boolean.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public BoolType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.BOOL_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-BOOL"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new BoolType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BpcharType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BpcharType.java new file mode 100644 index 0000000000..73d0df3a61 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/BpcharType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class BpcharType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), StringData.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public BpcharType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.BPCHAR_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-BPCHAR"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new BpcharType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/ByteaType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/ByteaType.java new file mode 100644 index 0000000000..744839ad64 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/ByteaType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class ByteaType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(byte[].class.getName(), byte[].class.getName()); + + private static final Class DEFAULT_CONVERSION = byte[].class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public ByteaType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.BYTEA_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-BYTEA"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new ByteaType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/CharType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/CharType.java new file mode 100644 index 0000000000..b4e9684a6e --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/CharType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class CharType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(char.class.getName(), char.class.getName()); + + private static final Class DEFAULT_CONVERSION = char.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public CharType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.CHAR_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-CHAR"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new CharType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/DateType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/DateType.java new file mode 100644 index 0000000000..96e7f849ec --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/DateType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.sql.Date; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class DateType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Date.class.getName(), Date.class.getName()); + + private static final Class DEFAULT_CONVERSION = Date.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public DateType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.DATE_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-DATE"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new DateType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float4Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float4Type.java new file mode 100644 index 0000000000..0ee7752b2c --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float4Type.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class Float4Type extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Float.class.getName(), Float.class.getName()); + + private static final Class DEFAULT_CONVERSION = Float.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public Float4Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.FLOAT4_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-FLOAT4"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new Float4Type(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float8Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float8Type.java new file mode 100644 index 0000000000..9b0ea89d48 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Float8Type.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class Float8Type extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Double.class.getName(), Double.class.getName()); + + private static final Class DEFAULT_CONVERSION = Double.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public Float8Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.FLOAT8_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-FLOAT8"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new Float8Type(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int2Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int2Type.java new file mode 100644 index 0000000000..d5bc4a27fc --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int2Type.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class Int2Type extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Integer.class.getName(), Integer.class.getName()); + + private static final Class DEFAULT_CONVERSION = Integer.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public Int2Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.INT2_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-INT2"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new Int2Type(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int4Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int4Type.java new file mode 100644 index 0000000000..150d79bc46 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int4Type.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class Int4Type extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Integer.class.getName(), Integer.class.getName()); + + private static final Class DEFAULT_CONVERSION = Integer.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public Int4Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.INT4_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-INT4"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new Int4Type(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int8Type.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int8Type.java new file mode 100644 index 0000000000..83d5696485 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/Int8Type.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class Int8Type extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Long.class.getName(), Long.class.getName()); + + private static final Class DEFAULT_CONVERSION = Long.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public Int8Type(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.INT8_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-INT8"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new Int8Type(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonType.java new file mode 100644 index 0000000000..3c90ee5ec4 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class JsonType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), StringData.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public JsonType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.JSON_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-JSON"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new JsonType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonbType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonbType.java new file mode 100644 index 0000000000..256342797f --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/JsonbType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class JsonbType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), StringData.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public JsonbType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.JSONB_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-JSONB"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new JsonbType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/MoneyType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/MoneyType.java new file mode 100644 index 0000000000..a131d413f9 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/MoneyType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class MoneyType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Double.class.getName(), Double.class.getName()); + + private static final Class DEFAULT_CONVERSION = Double.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public MoneyType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.MONEY_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-MONEY"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new MoneyType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NameType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NameType.java new file mode 100644 index 0000000000..cec1ae00cb --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NameType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class NameType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), StringData.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public NameType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.NAME_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-NAME"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new NameType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NumericType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NumericType.java new file mode 100644 index 0000000000..ac6c67d877 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/NumericType.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.math.BigDecimal; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class NumericType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(BigDecimal.class.getName(), DecimalData.class.getName()); + + private static final Class DEFAULT_CONVERSION = BigDecimal.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public NumericType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.NUMERIC_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-NUMERIC"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new NumericType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/OidType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/OidType.java new file mode 100644 index 0000000000..504e4ae7cc --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/OidType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class OidType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Long.class.getName(), Long.class.getName()); + + private static final Class DEFAULT_CONVERSION = Long.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public OidType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.INT8_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-OID"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new OidType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PgCustomType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PgCustomType.java new file mode 100644 index 0000000000..01e8e8dde0 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PgCustomType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; + +public abstract class PgCustomType extends LogicalType { + + private final boolean isArray; + private final int arrayOid; + + public PgCustomType( + boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray, Integer oid) { + super(isNullable, typeRoot); + this.isArray = isArray; + this.arrayOid = oid; + } + + public boolean isArray() { + return isArray; + } + + public int getArrayOid() { + return arrayOid; + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PointType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PointType.java new file mode 100644 index 0000000000..e82f53d18c --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/PointType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class PointType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), StringData.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public PointType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.POINT_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-POINT"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new PointType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TextType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TextType.java new file mode 100644 index 0000000000..5a85b39727 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TextType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class TextType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), StringData.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public TextType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.TEXT_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-TEXT"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new TextType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimeType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimeType.java new file mode 100644 index 0000000000..268d401204 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimeType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.sql.Time; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class TimeType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Time.class.getName(), Time.class.getName()); + + private static final Class DEFAULT_CONVERSION = Time.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public TimeType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.TIME_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-TIME"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new TimeType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampType.java new file mode 100644 index 0000000000..0d7acd73f1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampType.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class TimestampType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Timestamp.class.getName(), TimestampData.class.getName()); + + private static final Class DEFAULT_CONVERSION = Timestamp.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public TimestampType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.TIMESTAMP_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-TIMESTAMP"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new TimestampType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampTzType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampTzType.java new file mode 100644 index 0000000000..1fd4462307 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimestampTzType.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class TimestampTzType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Timestamp.class.getName(), TimestampData.class.getName()); + + private static final Class DEFAULT_CONVERSION = Timestamp.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public TimestampTzType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.TIMESTAMPTZ_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-TIMESTAMPTZ"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new TimestampTzType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimetzType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimetzType.java new file mode 100644 index 0000000000..8d0fa78d37 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/TimetzType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.sql.Time; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class TimetzType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(Time.class.getName(), Time.class.getName()); + + private static final Class DEFAULT_CONVERSION = Time.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public TimetzType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.TIMETZ_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-TIMETZ"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new TimetzType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/UuidType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/UuidType.java new file mode 100644 index 0000000000..1bead707f7 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/UuidType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class UuidType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), String.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public UuidType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.UUID_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-UUID"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new UuidType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarbitType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarbitType.java new file mode 100644 index 0000000000..effc7aea7a --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarbitType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class VarbitType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), String.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public VarbitType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.VARBIT_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-VARBIT"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new VarbitType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarcharType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarcharType.java new file mode 100644 index 0000000000..c2451b9156 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/VarcharType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class VarcharType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), StringData.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public VarcharType(boolean isNullable, LogicalTypeRoot typeRoot, Boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.VARCHAR_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-VARCHAR"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new VarcharType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/XmlType.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/XmlType.java new file mode 100644 index 0000000000..0c096d2dfd --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/converter/logical/XmlType.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.converter.logical; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; + +import org.opengauss.core.Oid; + +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class XmlType extends PgCustomType { + + private static final Set INPUT_CONVERSION = + conversionSet(String.class.getName(), String.class.getName()); + + private static final Class DEFAULT_CONVERSION = String.class; + + private static final Set OUTPUT_CONVERSION = conversionSet(Reader.class.getName()); + + public XmlType(boolean isNullable, LogicalTypeRoot typeRoot, boolean isArray) { + super(isNullable, typeRoot, isArray, Oid.XML_ARRAY); + } + + @Override + public String asSerializableString() { + return "PG-XML"; + } + + @Override + public boolean supportsInputConversion(Class clazz) { + return INPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class clazz) { + return OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(LogicalTypeVisitor visitor) { + return visitor.visit(this); + } + + @Override + public LogicalType copy(boolean isNullable) { + return new XmlType(isNullable, getTypeRoot(), isArray()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/dialect/OpengaussDialect.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/dialect/OpengaussDialect.java new file mode 100644 index 0000000000..20d3d0552b --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/dialect/OpengaussDialect.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.dialect; + +import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; +import com.dtstack.chunjun.connector.opengauss.converter.OpengaussRawTypeMapper; +import com.dtstack.chunjun.connector.opengauss.converter.OpengaussSyncConverter; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.RawTypeMapper; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import io.vertx.core.json.JsonArray; +import org.apache.commons.lang3.StringUtils; + +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; + +public class OpengaussDialect implements JdbcDialect { + + private static final long serialVersionUID = -8973956767518190621L; + + private static final String DIALECT_NAME = "openGauss"; + private static final String DRIVER = "org.opengauss.Driver"; + public static final String URL_START = "jdbc:opengauss:"; + + protected static final String COPY_SQL_TEMPL = + "copy %s(%s) from stdin DELIMITER '%s' NULL as '%s'"; + + @Override + public String dialectName() { + return DIALECT_NAME; + } + + @Override + public boolean canHandle(String url) { + return url.startsWith(URL_START); + } + + @Override + public RawTypeMapper getRawTypeConverter() { + return OpengaussRawTypeMapper::apply; + } + + @Override + public AbstractRowConverter + getColumnConverter(RowType rowType, CommonConfig commonConfig) { + return new OpengaussSyncConverter(rowType, commonConfig); + } + + @Override + public Optional defaultDriverName() { + return Optional.of(DRIVER); + } + + @Override + public boolean supportUpsert() { + return true; + } + + /** openGauss upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into openGauss. */ + @Override + public Optional getUpsertStatement( + String schema, + String tableName, + String[] fieldNames, + String[] uniqueKeyFields, + boolean allReplace) { + String updateClause; + String uniqueColumns = + Arrays.stream(uniqueKeyFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + updateClause = + Arrays.stream(fieldNames) + .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f)) + .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f)) + .collect(Collectors.joining(", ")); + + return Optional.of( + getInsertIntoStatement(schema, tableName, fieldNames) + + " ON CONFLICT (" + + uniqueColumns + + ")" + + " DO UPDATE SET " + + updateClause); + } + + @Override + public String getSelectFromStatement( + String schemaName, + String tableName, + String customSql, + String[] selectFields, + String where) { + String selectExpressions = + Arrays.stream(selectFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + StringBuilder sql = new StringBuilder(128); + sql.append("SELECT "); + if (StringUtils.isNotBlank(customSql)) { + sql.append("* FROM (") + .append(customSql) + .append(") ") + .append(JdbcUtil.TEMPORARY_TABLE_NAME); + } else { + sql.append(selectExpressions).append(" FROM "); + if (StringUtils.isNotBlank(schemaName)) { + sql.append(quoteIdentifier(schemaName)).append(" ."); + } + sql.append(quoteIdentifier(tableName)); + } + + if (StringUtils.isNotBlank(where)) { + sql.append(" WHERE ").append(where); + } + + return sql.toString(); + } + + public String getCopyStatement( + String schemaName, + String tableName, + String[] fields, + String fieldDelimiter, + String nullVal) { + String fieldsExpression = + Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(", ")); + + String tableLocation; + if (schemaName != null && !"".equals(schemaName.trim())) { + tableLocation = quoteIdentifier(schemaName) + "." + quoteIdentifier(tableName); + } else { + tableLocation = quoteIdentifier(tableName); + } + + return String.format( + COPY_SQL_TEMPL, tableLocation, fieldsExpression, fieldDelimiter, nullVal); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussOutputFormat.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussOutputFormat.java new file mode 100644 index 0000000000..6bbf7fee85 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussOutputFormat.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.sink; + +import com.dtstack.chunjun.connector.jdbc.converter.JdbcSyncConverter; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat; +import com.dtstack.chunjun.connector.opengauss.converter.OpengaussSyncConverter; +import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.enums.EWriteMode; +import com.dtstack.chunjun.throwable.NoRestartException; +import com.dtstack.chunjun.throwable.WriteRecordException; + +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.StringUtils; + +import lombok.extern.slf4j.Slf4j; +import org.opengauss.copy.CopyManager; +import org.opengauss.core.BaseConnection; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; + +@Slf4j +public class OpengaussOutputFormat extends JdbcOutputFormat { + + private static final long serialVersionUID = 6244886558080900510L; + + // pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00 + public static final String SPACE = "\u0000"; + + private static final String LINE_DELIMITER = "\n"; + private CopyManager copyManager; + private boolean enableCopyMode = false; + private String copySql = ""; + private static final String INSERT_SQL_MODE_TYPE = "copy"; + private static final String DEFAULT_FIELD_DELIMITER = "\001"; + + private static final String DEFAULT_NULL_VALUE = "\002"; + + @Override + protected void openInternal(int taskNumber, int numTasks) { + super.openInternal(taskNumber, numTasks); + try { + // check is use copy mode for insert + enableCopyMode = INSERT_SQL_MODE_TYPE.equalsIgnoreCase(jdbcConfig.getInsertSqlMode()); + if (EWriteMode.INSERT.name().equalsIgnoreCase(jdbcConfig.getMode()) && enableCopyMode) { + copyManager = new CopyManager((BaseConnection) dbConn); + + OpengaussDialect pgDialect = (OpengaussDialect) jdbcDialect; + copySql = + pgDialect.getCopyStatement( + jdbcConfig.getSchema(), + jdbcConfig.getTable(), + columnNameList.toArray(new String[0]), + StringUtils.isNullOrWhitespaceOnly( + jdbcConfig.getFieldDelim().trim()) + ? DEFAULT_FIELD_DELIMITER + : jdbcConfig.getFieldDelim(), + StringUtils.isNullOrWhitespaceOnly(jdbcConfig.getNullDelim().trim()) + ? DEFAULT_NULL_VALUE + : jdbcConfig.getNullDelim()); + + log.info("write sql:{}", copySql); + } + if (rowConverter instanceof JdbcSyncConverter) { + if (jdbcDialect.dialectName().equals("openGauss")) { + ((OpengaussSyncConverter) rowConverter).setConnection((BaseConnection) dbConn); + } + } + } catch (SQLException sqe) { + throw new IllegalArgumentException("checkUpsert() failed.", sqe); + } + } + + @Override + protected void writeSingleRecordInternal(RowData row) throws WriteRecordException { + if (!enableCopyMode) { + super.writeSingleRecordInternal(row); + } else { + if (rowConverter instanceof JdbcSyncConverter) { + ColumnRowData colRowData = (ColumnRowData) row; + // write with copy + int index = 0; + try { + StringBuilder rowStr = new StringBuilder(); + int lastIndex = row.getArity() - 1; + for (; index < row.getArity(); index++) { + appendColumn(colRowData, index, rowStr, index == lastIndex); + } + String rowVal = copyModeReplace(rowStr.toString()); + ByteArrayInputStream bi = + new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8)); + copyManager.copyIn(copySql, bi); + } catch (Exception e) { + processWriteException(e, index, row); + } + } else { + throw new NoRestartException("copy mode only support data sync with out table"); + } + } + } + + @Override + protected void writeMultipleRecordsInternal() throws Exception { + if (!enableCopyMode) { + super.writeMultipleRecordsInternal(); + } else { + if (rowConverter instanceof JdbcSyncConverter) { + StringBuilder rowsStrBuilder = new StringBuilder(128); + for (RowData row : rows) { + ColumnRowData colRowData = (ColumnRowData) row; + int lastIndex = row.getArity() - 1; + StringBuilder rowStr = new StringBuilder(128); + for (int index = 0; index < row.getArity(); index++) { + appendColumn(colRowData, index, rowStr, index == lastIndex); + } + String tempData = rowStr.toString(); + rowsStrBuilder.append(copyModeReplace(tempData)).append(LINE_DELIMITER); + } + String rowVal = rowsStrBuilder.toString(); + ByteArrayInputStream bi = + new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8)); + copyManager.copyIn(copySql, bi); + + if (checkpointEnabled && CheckpointingMode.EXACTLY_ONCE == checkpointMode) { + rowsOfCurrentTransaction += rows.size(); + } + } else { + throw new NoRestartException("copy mode only support data sync with out table"); + } + } + } + + private void appendColumn( + ColumnRowData colRowData, int pos, StringBuilder rowStr, boolean isLast) { + Object col = colRowData.getField(pos); + if (col == null) { + rowStr.append( + StringUtils.isNullOrWhitespaceOnly(jdbcConfig.getNullDelim().trim()) + ? DEFAULT_NULL_VALUE + : jdbcConfig.getNullDelim()); + + } else { + rowStr.append(col); + } + if (!isLast) { + rowStr.append( + StringUtils.isNullOrWhitespaceOnly(jdbcConfig.getFieldDelim().trim()) + ? DEFAULT_FIELD_DELIMITER + : jdbcConfig.getFieldDelim()); + } + } + + /** + * \r \n \ 等特殊字符串需要转义 + * + * @return + */ + private String copyModeReplace(String rowStr) { + if (rowStr.contains("\\")) { + rowStr = rowStr.replaceAll("\\\\", "\\\\\\\\"); + } + if (rowStr.contains("\r")) { + rowStr = rowStr.replaceAll("\r", "\\\\r"); + } + + if (rowStr.contains("\n")) { + rowStr = rowStr.replaceAll("\n", "\\\\n"); + } + + // pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00 + if (rowStr.contains(SPACE)) { + rowStr = rowStr.replaceAll(SPACE, ""); + } + return rowStr; + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussSinkFactory.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussSinkFactory.java new file mode 100644 index 0000000000..64c4d64155 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/sink/OpengaussSinkFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.sink; + +import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory; +import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect; + +public class OpengaussSinkFactory extends JdbcSinkFactory { + + public OpengaussSinkFactory(SyncConfig syncConfig) { + super(syncConfig, new OpengaussDialect()); + } + + public OpengaussSinkFactory(SyncConfig syncConfig, JdbcDialect dialect) { + super(syncConfig, dialect); + } + + @Override + protected JdbcOutputFormatBuilder getBuilder() { + return new JdbcOutputFormatBuilder(new OpengaussOutputFormat()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussInputFormat.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussInputFormat.java new file mode 100644 index 0000000000..6450f01932 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussInputFormat.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.source; + +import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat; +import com.dtstack.chunjun.connector.jdbc.util.SqlUtil; +import com.dtstack.chunjun.util.ExceptionUtil; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class OpengaussInputFormat extends JdbcInputFormat { + + private static final long serialVersionUID = -4046186579949376769L; + + @Override + protected void queryPollingWithOutStartLocation() throws SQLException { + // In opengauss, if resultCursorType is FORWARD_ONLY + // , the query will report an error after the method + // #setFetchDirection(ResultSet.FETCH_REVERSE) is called. + String querySql = + SqlUtil.buildOrderSql(jdbcConfig.getQuerySql(), jdbcConfig, jdbcDialect, "ASC"); + ps = + dbConn.prepareStatement( + querySql, ResultSet.TYPE_SCROLL_INSENSITIVE, resultSetConcurrency); + ps.setFetchSize(jdbcConfig.getFetchSize()); + ps.setQueryTimeout(jdbcConfig.getQueryTimeOut()); + resultSet = ps.executeQuery(); + hasNext = resultSet.next(); + + try { + // 间隔轮询一直循环,直到查询到数据库中的数据为止 + while (!hasNext) { + TimeUnit.MILLISECONDS.sleep(jdbcConfig.getPollingInterval()); + resultSet.close(); + // 如果事务不提交 就会导致数据库即使插入数据 也无法读到数据 + dbConn.commit(); + resultSet = ps.executeQuery(); + hasNext = resultSet.next(); + // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期 + if ((System.currentTimeMillis() - startTime) % 300000 + <= jdbcConfig.getPollingInterval()) { + log.info( + "no record matched condition in database, execute query sql = {}, startLocation = {}", + jdbcConfig.getQuerySql(), + endLocationAccumulator.getLocalValue()); + } + } + } catch (InterruptedException e) { + log.warn( + "interrupted while waiting for polling, e = {}", + ExceptionUtil.getErrorMessage(e)); + } + + // 查询到数据,更新querySql + StringBuilder builder = new StringBuilder(128); + builder.append(jdbcConfig.getQuerySql()); + if (jdbcConfig.getQuerySql().contains("WHERE")) { + builder.append(" AND "); + } else { + builder.append(" WHERE "); + } + builder.append(jdbcDialect.quoteIdentifier(jdbcConfig.getIncreColumn())) + .append(" > ? ORDER BY ") + .append(jdbcDialect.quoteIdentifier(jdbcConfig.getIncreColumn())) + .append(" ASC"); + jdbcConfig.setQuerySql(builder.toString()); + ps = + dbConn.prepareStatement( + jdbcConfig.getQuerySql(), + ResultSet.TYPE_SCROLL_INSENSITIVE, + resultSetConcurrency); + ps.setFetchSize(jdbcConfig.getFetchSize()); + ps.setQueryTimeout(jdbcConfig.getQueryTimeOut()); + log.info("update querySql, sql = {}", jdbcConfig.getQuerySql()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussSourceFactory.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussSourceFactory.java new file mode 100644 index 0000000000..7876a073b7 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/source/OpengaussSourceFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.source; + +import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; +import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class OpengaussSourceFactory extends JdbcSourceFactory { + + public OpengaussSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) { + super(syncConfig, env, new OpengaussDialect()); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/table/OpengaussDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/table/OpengaussDynamicTableFactory.java new file mode 100644 index 0000000000..dc64cdaaef --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/java/com/dtstack/chunjun/connector/opengauss/table/OpengaussDynamicTableFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.opengauss.table; + +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory; +import com.dtstack.chunjun.connector.opengauss.dialect.OpengaussDialect; + +public class OpengaussDynamicTableFactory extends JdbcDynamicTableFactory { + + private static final String IDENTIFIER = "opengauss-x"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + protected JdbcDialect getDialect() { + return new OpengaussDialect(); + } +} diff --git a/chunjun-connectors/chunjun-connector-opengauss/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-opengauss/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..bb5441b8b9 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-opengauss/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.dtstack.chunjun.connector.opengauss.table.OpengaussDynamicTableFactory diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml index a91f7cfe07..4e14d2d0bc 100755 --- a/chunjun-connectors/pom.xml +++ b/chunjun-connectors/pom.xml @@ -54,6 +54,7 @@ chunjun-connector-sqlserver chunjun-connector-db2 chunjun-connector-postgresql + chunjun-connector-opengauss chunjun-connector-greenplum chunjun-connector-dm chunjun-connector-gbase