From 284c84f37c1b68cbd7098f13ec75c8bafc2d1f63 Mon Sep 17 00:00:00 2001 From: libailin Date: Thu, 8 Aug 2024 09:50:12 +0800 Subject: [PATCH] [hotfix-#1911][starrocks] fixed stream load failed do not throw exception --- .../chunjun-connector-gbase8s/pom.xml | 35 +++++++++++++++++++ .../starrocks/sink/StarRocksOutputFormat.java | 10 ++++++ .../streamload/StreamLoadManager.java | 4 +++ 3 files changed, 49 insertions(+) diff --git a/chunjun-connectors/chunjun-connector-gbase8s/pom.xml b/chunjun-connectors/chunjun-connector-gbase8s/pom.xml index d169f9262c..bf317b4ee3 100644 --- a/chunjun-connectors/chunjun-connector-gbase8s/pom.xml +++ b/chunjun-connectors/chunjun-connector-gbase8s/pom.xml @@ -53,6 +53,41 @@ org.apache.maven.plugins maven-shade-plugin + 3.1.0 + + + package + + shade + + + false + + + org.slf4j:slf4j-api + log4j:log4j + ch.qos.logback:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.bson + com.dtstack.chunjun.connector.gbase8s.shaded.org.bson + + + + + org.apache.maven.plugins diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java index 69f304874d..8bdfc7097b 100644 --- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/sink/StarRocksOutputFormat.java @@ -39,6 +39,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.io.IOException; import java.sql.SQLException; import java.sql.Statement; import java.util.List; @@ -199,6 +200,15 @@ public String handleErrMessage(StarRocksStreamLoadFailedException e) { message, JSON.toJSONString(failedResponse)); } + @Override + public synchronized void close() throws IOException { + super.close(); + // 解决当异步执行streamLoad时,flushException不为空,则认为整个任务应该抛出异常 + if (streamLoadManager != null && streamLoadManager.getFlushException() != null) { + throw new RuntimeException(streamLoadManager.getFlushException()); + } + } + @Override protected void closeInternal() { if (streamLoadManager != null) { diff --git a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StreamLoadManager.java b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StreamLoadManager.java index 2bd0376373..af174f557b 100644 --- a/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StreamLoadManager.java +++ b/chunjun-connectors/chunjun-connector-starrocks/src/main/java/com/dtstack/chunjun/connector/starrocks/streamload/StreamLoadManager.java @@ -332,4 +332,8 @@ public boolean tableHasPartition() { return starrocksQueryVisitor.hasPartitions( starRocksConfig.getDatabase(), starRocksConfig.getTable()); } + + public Throwable getFlushException() { + return flushException; + } }