From 756fb9941fb15134f2b2d5591fdaf71f666bdd3f Mon Sep 17 00:00:00 2001 From: Amit-CloudSufi Date: Wed, 11 Dec 2024 11:19:03 +0000 Subject: [PATCH] Error management for Snowflake source and sink, Added new validation for maximum split size and NPE issue handled --- .../snowflake/common/BaseSnowflakeConfig.java | 64 ++++--- .../plugin/snowflake/common/OAuthUtil.java | 23 ++- .../common/SnowflakeErrorDetailsProvider.java | 161 ++++++++++++++++++ .../snowflake/common/SnowflakeErrorType.java | 59 +++++++ .../common/client/SnowflakeAccessor.java | 20 ++- .../snowflake/sink/batch/CSVBuffer.java | 22 ++- .../sink/batch/SnowflakeBatchSink.java | 11 +- .../sink/batch/SnowflakeOutputFormat.java | 25 ++- .../sink/batch/SnowflakeRecordWriter.java | 34 +++- .../sink/batch/SnowflakeSinkAccessor.java | 36 +++- ...tructuredRecordToCSVRecordTransformer.java | 42 +++-- .../source/batch/SnowflakeBatchSource.java | 4 + .../batch/SnowflakeBatchSourceConfig.java | 15 ++ .../source/batch/SnowflakeInputFormat.java | 5 +- .../SnowflakeMapToRecordTransformer.java | 13 +- .../source/batch/SnowflakeSourceAccessor.java | 41 ++++- 16 files changed, 495 insertions(+), 80 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorDetailsProvider.java create mode 100644 src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java diff --git a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java index 718a707..7967cfd 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java @@ -230,33 +230,10 @@ public String getConnectionArguments() { } public void validate(FailureCollector collector) { - if (getOauth2Enabled()) { - if (!containsMacro(PROPERTY_CLIENT_ID) - && Strings.isNullOrEmpty(getClientId())) { - collector.addFailure("Client ID is not set.", null) - .withConfigProperty(PROPERTY_CLIENT_ID); - } - if (!containsMacro(PROPERTY_CLIENT_SECRET) - && Strings.isNullOrEmpty(getClientSecret())) { - collector.addFailure("Client Secret is not set.", null) - .withConfigProperty(PROPERTY_CLIENT_SECRET); - } - if (!containsMacro(PROPERTY_REFRESH_TOKEN) - && Strings.isNullOrEmpty(getRefreshToken())) { - collector.addFailure("Refresh Token is not set.", null) - .withConfigProperty(PROPERTY_REFRESH_TOKEN); - } - } else if (getKeyPairEnabled()) { - if (!containsMacro(PROPERTY_USERNAME) - && Strings.isNullOrEmpty(getUsername())) { - collector.addFailure("Username is not set.", null) - .withConfigProperty(PROPERTY_USERNAME); - } - if (!containsMacro(PROPERTY_PRIVATE_KEY) - && Strings.isNullOrEmpty(getPrivateKey())) { - collector.addFailure("Private Key is not set.", null) - .withConfigProperty(PROPERTY_PRIVATE_KEY); - } + if (Boolean.TRUE.equals(getOauth2Enabled())) { + validateWhenOath2Enabled(collector); + } else if (Boolean.TRUE.equals(getKeyPairEnabled())) { + validateWhenKeyPairEnabled(collector); } else { if (!containsMacro(PROPERTY_USERNAME) && Strings.isNullOrEmpty(getUsername())) { @@ -272,6 +249,37 @@ public void validate(FailureCollector collector) { validateConnection(collector); } + private void validateWhenKeyPairEnabled(FailureCollector collector) { + if (!containsMacro(PROPERTY_USERNAME) + && Strings.isNullOrEmpty(getUsername())) { + collector.addFailure("Username is not set.", null) + .withConfigProperty(PROPERTY_USERNAME); + } + if (!containsMacro(PROPERTY_PRIVATE_KEY) + && Strings.isNullOrEmpty(getPrivateKey())) { + collector.addFailure("Private Key is not set.", null) + .withConfigProperty(PROPERTY_PRIVATE_KEY); + } + } + + private void validateWhenOath2Enabled(FailureCollector collector) { + if (!containsMacro(PROPERTY_CLIENT_ID) + && Strings.isNullOrEmpty(getClientId())) { + collector.addFailure("Client ID is not set.", null) + .withConfigProperty(PROPERTY_CLIENT_ID); + } + if (!containsMacro(PROPERTY_CLIENT_SECRET) + && Strings.isNullOrEmpty(getClientSecret())) { + collector.addFailure("Client Secret is not set.", null) + .withConfigProperty(PROPERTY_CLIENT_SECRET); + } + if (!containsMacro(PROPERTY_REFRESH_TOKEN) + && Strings.isNullOrEmpty(getRefreshToken())) { + collector.addFailure("Refresh Token is not set.", null) + .withConfigProperty(PROPERTY_REFRESH_TOKEN); + } + } + public boolean canConnect() { return (!containsMacro(PROPERTY_DATABASE) && !containsMacro(PROPERTY_SCHEMA_NAME) && !containsMacro(PROPERTY_ACCOUNT_NAME) && !containsMacro(PROPERTY_USERNAME) @@ -299,7 +307,7 @@ protected void validateConnection(FailureCollector collector) { .withConfigProperty(PROPERTY_USERNAME); // TODO: for oauth2 - if (keyPairEnabled) { + if (Boolean.TRUE.equals(keyPairEnabled)) { failure.withConfigProperty(PROPERTY_PRIVATE_KEY); } else { failure.withConfigProperty(PROPERTY_PASSWORD); diff --git a/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java b/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java index 3d94a17..3809e81 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java @@ -19,18 +19,27 @@ import com.google.gson.JsonElement; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; +import io.cdap.plugin.snowflake.common.exception.SchemaParseException; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; +import scala.xml.Null; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; import java.util.Base64; +import java.util.Objects; /** * A class which contains utilities to make OAuth2 specific calls. @@ -50,9 +59,15 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient httppost.setHeader("Content-type", "application/x-www-form-urlencoded"); // set grant type and refresh_token. It should be in body not url! - StringEntity entity = new StringEntity(String.format("refresh_token=%s&grant_type=refresh_token", - URLEncoder.encode(config.getRefreshToken(), "UTF-8"))); - httppost.setEntity(entity); + try { + StringEntity entity = new StringEntity(String.format("refresh_token=%s&grant_type=refresh_token", + URLEncoder.encode(Objects.requireNonNull(Objects.requireNonNull(config).getRefreshToken()), "UTF-8"))); + httppost.setEntity(entity); + } catch (NullPointerException e) { + String errorMessage = "Error encoding URL due to missing Refresh Token."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } // set 'Authorization' header String stringToEncode = config.getClientId() + ":" + config.getClientSecret(); @@ -72,7 +87,7 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient // if exception happened during parsing OR if json does not contain 'access_token' key. if (jsonElement == null) { - throw new RuntimeException(String.format("Unexpected response '%s' from '%s'", responseString, uri.toString())); + throw new RuntimeException(String.format("Unexpected response '%s' from '%s'", responseString, uri)); } return jsonElement.getAsString(); diff --git a/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorDetailsProvider.java new file mode 100644 index 0000000..db78d07 --- /dev/null +++ b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorDetailsProvider.java @@ -0,0 +1,161 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.snowflake.common; + +import com.google.common.base.Throwables; +import io.cdap.cdap.api.data.format.UnexpectedFormatException; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; +import io.cdap.plugin.snowflake.common.exception.SchemaParseException; + +import java.net.URISyntaxException; +import java.util.List; + + +/** + * Error details provided for the Snowflake + **/ +public class SnowflakeErrorDetailsProvider implements ErrorDetailsProvider { + + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof IllegalArgumentException) { + return getProgramFailureException((IllegalArgumentException) t, errorContext); + } + if (t instanceof IllegalStateException) { + return getProgramFailureException((IllegalStateException) t, errorContext); + } + if (t instanceof URISyntaxException) { + return getProgramFailureException((URISyntaxException) t, errorContext); + } + if (t instanceof SchemaParseException) { + return getProgramFailureException((SchemaParseException) t, errorContext); + } + if (t instanceof UnexpectedFormatException) { + return getProgramFailureException((UnexpectedFormatException) t, errorContext); + } + if (t instanceof ConnectionTimeoutException) { + return getProgramFailureException((ConnectionTimeoutException) t, errorContext); + } + } + return null; + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalArgumentException}. + * + * @param e The IllegalArgumentException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link IllegalStateException}. + * + * @param e The IllegalStateException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link URISyntaxException}. + * + * @param e The URISyntaxException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(URISyntaxException e, + ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link SchemaParseException}. + * + * @param e The SchemaParseException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(SchemaParseException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link UnexpectedFormatException}. + * + * @param e The UnexpectedFormatException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(UnexpectedFormatException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link ConnectionTimeoutException}. + * + * @param e The ConnectionTimeoutException to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(ConnectionTimeoutException e, ErrorContext errorContext) { + String errorMessage = e.getMessage(); + String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); + } +} diff --git a/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java new file mode 100644 index 0000000..a5f799b --- /dev/null +++ b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java @@ -0,0 +1,59 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.snowflake.common; + +import io.cdap.cdap.api.exception.ErrorType; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * Error Type provided based on the Snowflake error message code + * + **/ +public class SnowflakeErrorType { + + //https://github.com/snowflakedb/snowflake-jdbc/blob/master/src/main/java/net/snowflake/client/jdbc/ErrorCode.java + private static final Set USER_ERRORS = new HashSet<>(Arrays.asList( + 200004, 200006, 200007, 200008, 200009, 200010, 200011, 200012, 200014, + 200017, 200018, 200019, 200021, 200023, 200024, 200025, 200026, 200028, + 200029, 200030, 200031, 200032, 200033, 200034, 200035, 200036, 200037, + 200038, 200045, 200046, 200047, 200056 + )); + + private static final Set SYSTEM_ERRORS = new HashSet<>(Arrays.asList( + 200001, 200002, 200003, 200013, 200015, 200016, 200020, 200022, 200039, + 200040, 200044, 200061 + )); + + /** + * Method to get the error type based on the error code. + * + * @param errorCode the error code to classify + * @return the corresponding ErrorType (USER, SYSTEM, UNKNOWN) + */ + public static ErrorType getErrorType(int errorCode) { + if (USER_ERRORS.contains(errorCode)) { + return ErrorType.USER; + } else if (SYSTEM_ERRORS.contains(errorCode)) { + return ErrorType.SYSTEM; + } else { + return ErrorType.UNKNOWN; + } + } +} diff --git a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java index d74d85e..60a37fb 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java @@ -18,11 +18,19 @@ import com.google.common.base.Strings; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.etl.api.exception.ErrorPhase; import io.cdap.plugin.common.KeyValueListParser; import io.cdap.plugin.snowflake.common.BaseSnowflakeConfig; import io.cdap.plugin.snowflake.common.OAuthUtil; +import io.cdap.plugin.snowflake.common.SnowflakeErrorDetailsProvider; +import io.cdap.plugin.snowflake.common.SnowflakeErrorType; import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; import io.cdap.plugin.snowflake.common.util.QueryUtil; +import net.snowflake.client.jdbc.ErrorCode; import net.snowflake.client.jdbc.SnowflakeBasicDataSource; import org.apache.http.impl.client.HttpClients; @@ -37,8 +45,10 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.stream.Collectors; /** * A class which accesses Snowflake API. @@ -88,7 +98,9 @@ public List describeQuery(String query) throws IOExcep fieldDescriptors.add(new SnowflakeFieldDescriptor(name, type, nullable)); } } catch (SQLException e) { - throw new IOException(e); + String errorMessage = "Error occurred while executing query to fetch descriptors. "; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.UNKNOWN, true, e); } return fieldDescriptors; } @@ -143,6 +155,12 @@ public void checkConnection() { connection.getMetaData(); } catch (SQLException e) { throw new ConnectionTimeoutException("Cannot create Snowflake connection.", e); + } catch (NullPointerException e) { + String errorMessage = "Cannot create Snowflake connection. Username or password is missing."; + throw ErrorUtils.getProgramFailureException( + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, + String.format("Error message: %s", errorMessage), + ErrorType.SYSTEM, true, e); } } // SnowflakeBasicDataSource doesn't provide access for additional properties. diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java index 436ae74..5dbd408 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java @@ -15,6 +15,9 @@ */ package io.cdap.plugin.snowflake.sink.batch; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -32,12 +35,12 @@ public class CSVBuffer implements Closeable { .withAllowMissingColumnNames(false); private CSVPrinter csvPrinter; - private ByteArrayOutputStream csvStream; + private final ByteArrayOutputStream csvStream; private boolean isHeaderPrinted; - private boolean printHeader; + private final boolean printHeader; private int recordsCount = 0; - public CSVBuffer(boolean printHeader) throws IOException { + public CSVBuffer(boolean printHeader) { this.printHeader = printHeader; this.csvStream = new ByteArrayOutputStream(); reset(); @@ -54,12 +57,21 @@ public void write(CSVRecord csvRecord) throws IOException { recordsCount++; } - public void reset() throws IOException { + public void reset() { isHeaderPrinted = !printHeader; recordsCount = 0; csvStream.reset(); // we need to re-create this or else OutputStreamWriter will not able to write after reset. - csvPrinter = new CSVPrinter(new OutputStreamWriter(csvStream, StandardCharsets.UTF_8), csvFormat); + try { + csvPrinter = new CSVPrinter(new OutputStreamWriter(csvStream, StandardCharsets.UTF_8), csvFormat); + } catch (IOException e) { + String errorMessage = String.format("Unable to reset the CSV stream and recreate the printer. " + + "This might occur due to issues with the OutputStreamWriter or the CSV format. " + + "Root cause: %s", e.getMessage() + ); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.UNKNOWN, true, e); + } } public int size() { diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java index 3c7fa86..6f982f1 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java @@ -29,7 +29,9 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.snowflake.common.SnowflakeErrorDetailsProvider; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,13 +76,16 @@ public void prepareRun(BatchSinkContext context) { LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName()); lineageRecorder.createExternalDataset(inputSchema); // Record the field level WriteOperation - if (inputSchema.getFields() != null && !inputSchema.getFields().isEmpty()) { + if (inputSchema != null && inputSchema.getFields() != null && !inputSchema.getFields().isEmpty()) { String operationDescription = String.format("Wrote to Snowflake table '%s'", config.getTableName()); lineageRecorder.recordWrite("Write", operationDescription, inputSchema.getFields().stream() .map(Schema.Field::getName) .collect(Collectors.toList())); } + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(SnowflakeErrorDetailsProvider.class.getName())); + } @Override @@ -90,9 +95,9 @@ public void initialize(BatchRuntimeContext context) throws Exception { } @Override - public void transform(StructuredRecord record, Emitter> emitter) + public void transform(StructuredRecord structuredRecord, Emitter> emitter) throws IOException { - CSVRecord csvRecord = transformer.transform(record); + CSVRecord csvRecord = transformer.transform(structuredRecord); emitter.emit(new KeyValue<>(null, csvRecord)); } } diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java index 41d5613..5bea75e 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java @@ -17,6 +17,9 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -43,9 +46,8 @@ public class SnowflakeOutputFormat extends OutputFormat public static final String DESTINATION_STAGE_PATH_PROPERTY = "cdap.dest.stage.path"; @Override - public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) - throws IOException { - return new SnowflakeRecordWriter(taskAttemptContext); + public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) { + return new SnowflakeRecordWriter(taskAttemptContext); } @Override @@ -64,11 +66,11 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) public void setupJob(JobContext jobContext) { Configuration conf = jobContext.getConfiguration(); conf.set(DESTINATION_STAGE_PATH_PROPERTY, DESTINATION_STAGE_PATH); - LOG.info(String.format("Writing data to '%s'", DESTINATION_STAGE_PATH)); + LOG.info("Writing data to '{}'", DESTINATION_STAGE_PATH); } @Override - public void commitJob(JobContext jobContext) throws IOException { + public void commitJob(JobContext jobContext) { Configuration conf = jobContext.getConfiguration(); String configJson = conf.get( SnowflakeOutputFormatProvider.PROPERTY_CONFIG_JSON); @@ -78,8 +80,17 @@ public void commitJob(JobContext jobContext) throws IOException { String destinationStagePath = conf.get(DESTINATION_STAGE_PATH_PROPERTY); SnowflakeSinkAccessor snowflakeAccessor = new SnowflakeSinkAccessor(config); - snowflakeAccessor.populateTable(destinationStagePath); - snowflakeAccessor.removeDirectory(destinationStagePath); + try { + snowflakeAccessor.populateTable(destinationStagePath); + snowflakeAccessor.removeDirectory(destinationStagePath); + } catch (IOException e) { + String errorMessage = String.format( + "Error committing job: Failed to populate table and remove directory at path '%s'.", + destinationStagePath + ); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } } @Override diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java index 9434481..e9121b3 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java @@ -16,6 +16,9 @@ package io.cdap.plugin.snowflake.sink.batch; import com.google.gson.Gson; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; @@ -41,7 +44,7 @@ public class SnowflakeRecordWriter extends RecordWriter private final SnowflakeSinkAccessor snowflakeAccessor; private final String destinationStagePath; - public SnowflakeRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException { + public SnowflakeRecordWriter(TaskAttemptContext taskAttemptContext) { Configuration conf = taskAttemptContext.getConfiguration(); destinationStagePath = conf.get(SnowflakeOutputFormat.DESTINATION_STAGE_PATH_PROPERTY); String configJson = conf.get( @@ -55,21 +58,40 @@ public SnowflakeRecordWriter(TaskAttemptContext taskAttemptContext) throws IOExc } @Override - public void write(NullWritable key, CSVRecord csvRecord) throws IOException { + public void write(NullWritable key, CSVRecord csvRecord) { csvBufferSizeCheck.reset(); - csvBufferSizeCheck.write(csvRecord); + try { + csvBufferSizeCheck.write(csvRecord); + } catch (IOException e) { + String errorMessage = String.format("Unable to write CSV record in the size check buffer. Record: %s", csvRecord); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } if (config.getMaxFileSize() > 0 && csvBuffer.size() + csvBufferSizeCheck.size() > config.getMaxFileSize()) { submitCurrentBatch(); } - csvBuffer.write(csvRecord); + try { + csvBuffer.write(csvRecord); + } catch (IOException e) { + String errorMessage = String.format("Unable to write CSV record in the main buffer. Record: %s", csvRecord); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } + } - private void submitCurrentBatch() throws IOException { + private void submitCurrentBatch() { if (csvBuffer.getRecordsCount() != 0) { try (InputStream csvInputStream = new ByteArrayInputStream(csvBuffer.getByteArray())) { snowflakeAccessor.uploadStream(csvInputStream, destinationStagePath); + } catch (IOException e) { + String errorMessage = String.format("Failed to upload file to the destination stage '%s'. Error: %s", + destinationStagePath, e.getMessage() + ); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); } csvBuffer.reset(); @@ -77,7 +99,7 @@ private void submitCurrentBatch() throws IOException { } @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException { + public void close(TaskAttemptContext taskAttemptContext) { submitCurrentBatch(); } } diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java index 0eba82d..8a77d0d 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java @@ -16,7 +16,14 @@ package io.cdap.plugin.snowflake.sink.batch; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.snowflake.common.SnowflakeErrorType; import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor; +import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; +import net.snowflake.client.jdbc.ErrorCode; import net.snowflake.client.jdbc.SnowflakeConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +31,10 @@ import java.io.InputStream; import java.sql.Connection; import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; /** * A class which accesses Snowflake API to do actions used by batch sink. @@ -45,7 +55,7 @@ public SnowflakeSinkAccessor(SnowflakeSinkConfig config) { this.config = config; } - public void uploadStream(InputStream inputStream, String stageDir) throws IOException { + public void uploadStream(InputStream inputStream, String stageDir) { // file name needs to be unique across all the nodes. String filename = String.format(DEST_FILE_NAME, UUID.randomUUID().toString()); LOG.info("Uploading file '{}' to table stage", filename); @@ -55,14 +65,32 @@ public void uploadStream(InputStream inputStream, String stageDir) throws IOExce null, inputStream, filename, true); } catch (SQLException e) { - throw new IOException(e); + String errorMessage = String.format( + "Unable to compress and upload data to destination stage '%s'. Filename: '%s'. Error: %s", + stageDir, filename, e.getMessage() + ); + List errorCodes = Arrays.stream(ErrorCode.values()) + .filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState())).collect(Collectors.toList()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), + errorCodes.isEmpty() ? ErrorType.UNKNOWN : SnowflakeErrorType.getErrorType(errorCodes.get(0).getMessageCode()), + true, ErrorCodeType.SQLSTATE, e.getSQLState(), + "https://docs.snowflake.com/en/user-guide/client-connectivity-troubleshooting/error-messages", e); } } - public void populateTable(String destinationStagePath) throws IOException { + public void populateTable(String destinationStagePath) { String populateStatement = String.format(POPULATE_TABLE_STAGE, config.getTableName(), destinationStagePath, config.getCopyOptions()); - runSQL(populateStatement); + try { + runSQL(populateStatement); + } catch (IOException e) { + String errorMessage = String.format("Unable to copy information. " + + "Failed to populate table '%s' from the source stage path '%s' with the provided options: '%s'. ", + config.getTableName(), destinationStagePath, config.getCopyOptions()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } } public void removeDirectory(String path) throws IOException { diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java index 49ec955..dd8cac6 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java @@ -19,6 +19,9 @@ import com.google.gson.JsonParser; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.format.StructuredRecordStringConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,7 @@ import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -39,11 +43,11 @@ public class StructuredRecordToCSVRecordTransformer { private static final Logger LOG = LoggerFactory.getLogger(StructuredRecordToCSVRecordTransformer.class); - public CSVRecord transform(StructuredRecord record) throws IOException { + public CSVRecord transform(StructuredRecord record) { List fieldNames = new ArrayList<>(); List values = new ArrayList<>(); - for (Schema.Field field : record.getSchema().getFields()) { + for (Schema.Field field : Objects.requireNonNull(Objects.requireNonNull(record.getSchema()).getFields())) { String fieldName = field.getName(); String value = convertSchemaFieldToString(record.get(fieldName), field, record); @@ -63,8 +67,7 @@ public CSVRecord transform(StructuredRecord record) throws IOException { * @return string representing the value in format, which can be understood by Snowflake */ @Nullable - public static String convertSchemaFieldToString(Object value, Schema.Field field, StructuredRecord record) - throws IOException { + public static String convertSchemaFieldToString(Object value, Schema.Field field, StructuredRecord record) { // don't convert null to avoid NPE if (value == null) { return null; @@ -100,7 +103,7 @@ public static String convertSchemaFieldToString(Object value, Schema.Field field instant = Instant.ofEpochMilli((Long) value); return instant.atZone(ZoneOffset.UTC).toLocalTime().toString(); case DECIMAL: - return record.getDecimal(field.getName()).toString(); + return Objects.requireNonNull(record.getDecimal(field.getName())).toString(); default: throw new IllegalArgumentException( String.format("Field '%s' is of unsupported type '%s'", fieldSchema.getDisplayName(), @@ -111,12 +114,31 @@ public static String convertSchemaFieldToString(Object value, Schema.Field field switch (fieldSchema.getType()) { // convert to json so it can be saved to Snowflake's variant case RECORD: - return StructuredRecordStringConverter.toJsonString((StructuredRecord) value); - // convert to json so it can be saved to Snowflake's variant + try { + return StructuredRecordStringConverter.toJsonString((StructuredRecord) value); + } catch (IOException e) { + String errorMessage = String.format( + "Failed to encode record to JSON for schema field '%s' of type RECORD. Cause: %s", + field.getName(), e.getMessage() + ); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } + // convert to json so it can be saved to Snowflake's variant case ARRAY: - String stringRecord = StructuredRecordStringConverter.toJsonString(record); - JsonElement jsonObject = new JsonParser().parse(stringRecord); - return jsonObject.getAsJsonObject().get(field.getName()).toString(); + String stringRecord; + try { + stringRecord = StructuredRecordStringConverter.toJsonString(record); + JsonElement jsonObject = new JsonParser().parse(stringRecord); + return jsonObject.getAsJsonObject().get(field.getName()).toString(); + } catch (IOException e) { + String errorMessage = String.format( + "Failed to encode record to JSON for schema field '%s' of type ARRAY. Cause: %s", + field.getName(), e.getMessage() + ); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } // convert to hex which can be understood by Snowflake and saved to BINARY type case BYTES: byte[] bytes; diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java index f5fde15..7865bf9 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java @@ -30,7 +30,9 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.snowflake.common.SnowflakeErrorDetailsProvider; import io.cdap.plugin.snowflake.common.util.SchemaHelper; import org.apache.hadoop.io.NullWritable; @@ -88,6 +90,8 @@ public void prepareRun(BatchSourceContext context) { } context.setInput(Input.of(config.getReferenceName(), new SnowflakeInputFormatProvider(config, escapeChar))); + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(SnowflakeErrorDetailsProvider.class.getName())); } @Override diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java index 9cd9ac8..b28b43c 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java @@ -19,9 +19,13 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.snowflake.common.BaseSnowflakeConfig; + +import java.util.Objects; import javax.annotation.Nullable; + /** * This class {@link SnowflakeBatchSourceConfig} provides all the configuration required for * configuring the Source plugin. @@ -83,4 +87,15 @@ public String getReferenceName() { public String getSchema() { return schema; } + + public void validate(FailureCollector collector) { + super.validate(collector); + + if (!containsMacro(PROPERTY_MAX_SPLIT_SIZE) && Objects.nonNull(maxSplitSize) + && maxSplitSize < 0) { + collector.addFailure("Maximum Slit Size cannot be a negative number.", null) + .withConfigProperty(PROPERTY_MAX_SPLIT_SIZE); + } + } + } diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java index c4c1d1b..c0cfdcf 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java @@ -19,6 +19,9 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -39,7 +42,7 @@ public class SnowflakeInputFormat extends InputFormat { private static final Gson GSON = new Gson(); @Override - public List getSplits(JobContext jobContext) throws IOException { + public List getSplits(JobContext jobContext) { SnowflakeSourceAccessor snowflakeAccessor = getSnowflakeAccessor(jobContext.getConfiguration()); List stageSplits = snowflakeAccessor.prepareStageSplits(); return stageSplits.stream() diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java index d8990c0..d541412 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java @@ -30,6 +30,7 @@ import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -55,7 +56,8 @@ private StructuredRecord getStructuredRecord(Map row, Schema sch .filter(entry -> schema.getField(entry.getKey()) != null) // filter absent fields in the schema .forEach(entry -> builder.set( entry.getKey(), - convertValue(entry.getKey(), entry.getValue(), schema.getField(entry.getKey()).getSchema()))); + convertValue(entry.getKey(), entry.getValue(), + Objects.requireNonNull(schema.getField(entry.getKey())).getSchema()))); return builder.build(); } @@ -105,11 +107,12 @@ private Object convertValue(String fieldName, String value, Schema fieldSchema) return Double.parseDouble(castValue(value, fieldName, String.class)); case STRING: return value; - } + default: + throw new UnexpectedFormatException( + String.format("Unsupported schema type: '%s' for field: '%s'. Supported types are 'bytes, boolean, " + + "double, string'.", fieldSchema, fieldName)); - throw new UnexpectedFormatException( - String.format("Unsupported schema type: '%s' for field: '%s'. Supported types are 'bytes, boolean, " - + "double, string'.", fieldSchema, fieldName)); + } } private static T castValue(Object value, String fieldName, Class clazz) { diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java index bc8c922..860517a 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java @@ -17,9 +17,15 @@ package io.cdap.plugin.snowflake.source.batch; import au.com.bytecode.opencsv.CSVReader; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.plugin.snowflake.common.SnowflakeErrorType; import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor; import io.cdap.plugin.snowflake.common.util.QueryUtil; import io.cdap.plugin.snowflake.sink.batch.SnowflakeSinkAccessor; +import net.snowflake.client.jdbc.ErrorCode; import net.snowflake.client.jdbc.SnowflakeConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,14 +37,16 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; /** * A class which accesses Snowflake API to do actions used by batch source. */ public class SnowflakeSourceAccessor extends SnowflakeAccessor { - private static final Logger LOG = LoggerFactory.getLogger(SnowflakeSinkAccessor.class); + private static final Logger LOG = LoggerFactory.getLogger(SnowflakeSourceAccessor.class); // Directory should be unique, so that parallel pipelines can run correctly, as well as after failure we don't // have old stage files in the dir. private static final String STAGE_PATH = "@~/cdap_stage/result" + UUID.randomUUID() + "/"; @@ -74,7 +82,7 @@ public SnowflakeSourceAccessor(SnowflakeBatchSourceConfig config, String escapeC * @return List of file paths in Snowflake stage. * @throws IOException thrown if there are any issue with the I/O operations. */ - public List prepareStageSplits() throws IOException { + public List prepareStageSplits() { LOG.info("Loading data into stage: '{}'", STAGE_PATH); String copy = String.format(COMAND_COPY_INTO, QueryUtil.removeSemicolon(config.getImportQuery())); if (config.getMaxSplitSize() > 0) { @@ -92,7 +100,15 @@ public List prepareStageSplits() throws IOException { } } } catch (SQLException e) { - throw new IOException(e); + String errorMessage = String.format("Failed to load data into stage '%s'. Error executing query: %s", + STAGE_PATH, e.getMessage()); + List errorCodes = Arrays.stream(ErrorCode.values()) + .filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState())).collect(Collectors.toList()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), + errorCodes.isEmpty() ? ErrorType.UNKNOWN : SnowflakeErrorType.getErrorType(errorCodes.get(0).getMessageCode()), + true, ErrorCodeType.SQLSTATE, e.getSQLState(), + "https://docs.snowflake.com/en/user-guide/client-connectivity-troubleshooting/error-messages", e); } return stageSplits; } @@ -102,8 +118,16 @@ public List prepareStageSplits() throws IOException { * @param stageSplit path to file in Snowflake stage. * @throws IOException hrown if there are any issue with the I/O operations. */ - public void removeStageFile(String stageSplit) throws IOException { - runSQL(String.format("remove @~/%s", stageSplit)); + public void removeStageFile(String stageSplit) { + try { + runSQL(String.format("remove @~/%s", stageSplit)); + } catch (IOException e) { + String errorMessage = String.format("Failed to execute the SQL remove command for stage file: %s. Error: %s", + stageSplit, e.getMessage() + ); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, String.format("Error message: %s", errorMessage), ErrorType.SYSTEM, true, e); + } } /** @@ -120,7 +144,12 @@ public CSVReader buildCsvReader(String stageSplit) throws IOException { InputStreamReader inputStreamReader = new InputStreamReader(downloadStream); return new CSVReader(inputStreamReader, ',', '"', escapeChar); } catch (SQLException e) { - throw new IOException(e); + String errorMessage = String.format( + "Failed to execute the query due to an SQL exception. Stage Split: %s. Reason: %s", + stageSplit, e.getMessage() + ); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, e.getMessage(), ErrorType.SYSTEM, true, new IOException(e)); } } }