Skip to content

Commit

Permalink
[Feature-#1918][s3] Add support for reading all types of documents su…
Browse files Browse the repository at this point in the history
…pported by Apache Tika, read excel format
  • Loading branch information
libailin authored and lihongwei committed Sep 20, 2024
1 parent 112f183 commit 6bbd789
Show file tree
Hide file tree
Showing 40 changed files with 5,600 additions and 28 deletions.
9 changes: 8 additions & 1 deletion chunjun-connectors/chunjun-connector-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,15 @@
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-format-base</artifactId>
<artifactId>chunjun-format-tika</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-format-excel</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package com.dtstack.chunjun.connector.s3.config;

import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.connector.format.base.config.TikaReadConfig;
import com.dtstack.chunjun.format.excel.config.ExcelFormatConfig;
import com.dtstack.chunjun.format.tika.config.TikaReadConfig;

import com.amazonaws.regions.Regions;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -104,4 +105,6 @@ public class S3Config extends CommonConfig implements Serializable {
private boolean disableBucketNameInEndpoint = false;

private TikaReadConfig tikaReadConfig = new TikaReadConfig();

private ExcelFormatConfig excelFormatConfig = new ExcelFormatConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static com.dtstack.chunjun.connector.format.base.config.TikaReadConfig.ORIGINAL_FILENAME;
import static com.dtstack.chunjun.format.tika.config.TikaReadConfig.ORIGINAL_FILENAME;

/** The OutputFormat Implementation which write data to Amazon S3. */
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
field.setName(column.getName());
field.setType(
TypeConfig.fromString(column.getDataType().getLogicalType().asSummaryString()));
field.setIndex(i);
int index =
s3Config.getExcelFormatConfig().getColumnIndex() != null
? s3Config.getExcelFormatConfig()
.getColumnIndex()
.get(columns.indexOf(column))
: columns.indexOf(column);
field.setIndex(index);
columnList.add(field);
}
s3Config.setColumn(columnList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

package com.dtstack.chunjun.connector.s3.source;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.connector.format.base.common.TikaData;
import com.dtstack.chunjun.connector.format.base.source.TikaInputFormat;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.enums.CompressType;
import com.dtstack.chunjun.connector.s3.util.ReaderUtil;
import com.dtstack.chunjun.connector.s3.util.S3SimpleObject;
import com.dtstack.chunjun.connector.s3.util.S3Util;
import com.dtstack.chunjun.format.excel.common.ExcelData;
import com.dtstack.chunjun.format.excel.source.ExcelInputFormat;
import com.dtstack.chunjun.format.tika.common.TikaData;
import com.dtstack.chunjun.format.tika.source.TikaInputFormat;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
Expand All @@ -40,6 +43,7 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -77,6 +81,9 @@ public class S3InputFormat extends BaseRichInputFormat {
private transient TikaData tikaData;
private TikaInputFormat tikaInputFormat;

private transient ExcelData excelData;
private ExcelInputFormat excelInputFormat;

@Override
public void openInputFormat() throws IOException {
super.openInputFormat();
Expand Down Expand Up @@ -143,10 +150,31 @@ protected InputSplit[] createInputSplitsInternal(int minNumSplits) {
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
String[] fields;
try {
fields =
s3Config.getTikaReadConfig().isUseExtract() && tikaData != null
? tikaData.getData()
: readerUtil.getValues();
if (s3Config.getTikaReadConfig().isUseExtract() && tikaData != null) {
fields = tikaData.getData();
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat() && excelData != null) {
fields = excelData.getData();
} else {
fields = readerUtil.getValues();
}
// 处理字段配置了对应的列索引
if (s3Config.getExcelFormatConfig().getColumnIndex() != null) {
List<FieldConfig> columns = s3Config.getColumn();
String[] fieldsData = new String[columns.size()];
for (int i = 0; i < CollectionUtils.size(columns); i++) {
FieldConfig fieldConfig = columns.get(i);
if (fieldConfig.getIndex() >= fields.length) {
String errorMessage =
String.format(
"The column index is greater than the data size."
+ " The current column index is [%s], but the data size is [%s]. Data loss may occur.",
fieldConfig.getIndex(), fields.length);
throw new IllegalArgumentException(errorMessage);
}
fieldsData[i] = fields[fieldConfig.getIndex()];
}
fields = fieldsData;
}
rowData = rowConverter.toInternal(fields);
} catch (IOException e) {
throw new ChunJunRuntimeException(e);
Expand Down Expand Up @@ -176,10 +204,45 @@ public boolean reachedEnd() throws IOException {
if (s3Config.getTikaReadConfig().isUseExtract()) {
tikaData = getTikaData();
return tikaData == null || tikaData.getData() == null;
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat()) {
excelData = getExcelData();
return excelData == null || excelData.getData() == null;
}
return reachedEndWithoutCheckState();
}

public ExcelData getExcelData() {
if (excelInputFormat == null) {
nextExcelDataStream();
}
if (excelInputFormat != null) {
if (!excelInputFormat.hasNext()) {
excelInputFormat.close();
excelInputFormat = null;
return getExcelData();
}
String[] record = excelInputFormat.nextRecord();
return new ExcelData(record);
} else {
return null;
}
}

private void nextExcelDataStream() {
if (splits.hasNext()) {
currentObject = splits.next();
GetObjectRequest rangeObjectRequest =
new GetObjectRequest(s3Config.getBucket(), currentObject);
log.info("Current read file {}", currentObject);
S3Object o = amazonS3.getObject(rangeObjectRequest);
S3ObjectInputStream s3is = o.getObjectContent();
excelInputFormat = new ExcelInputFormat();
excelInputFormat.open(s3is, s3Config.getExcelFormatConfig());
} else {
excelInputFormat = null;
}
}

public TikaData getTikaData() {
if (tikaInputFormat == null) {
nextTikaDataStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,36 @@

package com.dtstack.chunjun.connector.s3.table;

import com.dtstack.chunjun.connector.format.base.config.TikaReadConfig;
import com.dtstack.chunjun.connector.format.base.options.TikaOptions;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink;
import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource;
import com.dtstack.chunjun.connector.s3.table.options.S3Options;
import com.dtstack.chunjun.format.excel.config.ExcelFormatConfig;
import com.dtstack.chunjun.format.excel.options.ExcelFormatOptions;
import com.dtstack.chunjun.format.tika.config.TikaReadConfig;
import com.dtstack.chunjun.format.tika.options.TikaOptions;
import com.dtstack.chunjun.table.options.SinkOptions;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;

import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class S3DynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
private static final String IDENTIFIER = "s3-x";
Expand Down Expand Up @@ -71,7 +80,38 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tikaReadConfig.setOverlapRatio(options.get(TikaOptions.OVERLAP_RATIO));
tikaReadConfig.setChunkSize(options.get(TikaOptions.CHUNK_SIZE));
s3Config.setTikaReadConfig(tikaReadConfig);
return new S3DynamicTableSource(context.getCatalogTable().getResolvedSchema(), s3Config);
ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
List<Column> columns = resolvedSchema.getColumns();
ExcelFormatConfig excelFormatConfig = new ExcelFormatConfig();
excelFormatConfig.setUseExcelFormat(options.get(ExcelFormatOptions.USE_EXCEL_FORMAT));
excelFormatConfig.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER));
if (StringUtils.isNotBlank(options.get(ExcelFormatOptions.SHEET_NO))) {
List<Integer> sheetNo =
Arrays.stream(options.get(ExcelFormatOptions.SHEET_NO).split(","))
.map(Integer::parseInt)
.collect(Collectors.toList());
excelFormatConfig.setSheetNo(sheetNo);
}
if (StringUtils.isNotBlank(options.get(ExcelFormatOptions.COLUMN_INDEX))) {
List<Integer> columnIndex =
Arrays.stream(options.get(ExcelFormatOptions.COLUMN_INDEX).split(","))
.map(Integer::parseInt)
.collect(Collectors.toList());
excelFormatConfig.setColumnIndex(columnIndex);
}
final String[] fields = new String[columns.size()];
IntStream.range(0, fields.length).forEach(i -> fields[i] = columns.get(i).getName());
excelFormatConfig.setFields(fields);
s3Config.setExcelFormatConfig(excelFormatConfig);
if (s3Config.getExcelFormatConfig().getColumnIndex() != null
&& columns.size() != s3Config.getExcelFormatConfig().getColumnIndex().size()) {
throw new IllegalArgumentException(
String.format(
"The number of fields (%s) is inconsistent with the number of indexes (%s).",
columns.size(),
s3Config.getExcelFormatConfig().getColumnIndex().size()));
}
return new S3DynamicTableSource(resolvedSchema, s3Config);
}

@Override
Expand Down Expand Up @@ -112,6 +152,9 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(TikaOptions.USE_EXTRACT);
options.add(TikaOptions.CHUNK_SIZE);
options.add(TikaOptions.OVERLAP_RATIO);
options.add(ExcelFormatOptions.SHEET_NO);
options.add(ExcelFormatOptions.COLUMN_INDEX);
options.add(ExcelFormatOptions.USE_EXCEL_FORMAT);
return options;
}

Expand Down
1 change: 0 additions & 1 deletion chunjun-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
<module>chunjun-connector-nebula</module>
<module>chunjun-connector-kingbase</module>
<module>chunjun-connector-hudi</module>
<module>chunjun-connector-format-base</module>
</modules>

<dependencies>
Expand Down
59 changes: 59 additions & 0 deletions chunjun-formats/chunjun-format-excel/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-formats</artifactId>
<version>${revision}</version>
</parent>

<artifactId>chunjun-format-excel</artifactId>
<name>ChunJun : Formats : Excel</name>

<properties>
<format.dir>excel</format.dir>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Loading

0 comments on commit 6bbd789

Please sign in to comment.