Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Spark] Unable to operate on Delta table after applying large delete expression (>20MB) #2565

Open
2 of 8 tasks
biochimia opened this issue Jan 24, 2024 · 3 comments
Open
2 of 8 tasks
Labels
bug Something isn't working

Comments

@biochimia
Copy link

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

Spark 3.5 depends on Jackson 2.15 which introduces limits on processed JSON, namely a 20 MB (default) limit on the length of JSON strings.

When using Delta with Spark 3.5 (Jackson 2.15) it is possible to generate delta log entries that use larger strings, and it will not be possible to read these log entries back with the same version of Spark/Jackson.

In our case, we hit this when applying a horribly large deletion expression.

{
  "commitInfo": {
    "timestamp": 1706058960723,
    "operation": "DELETE",
    "operationParameters": {
      "predicate": "[\" ... believe it or not we had a 30MB expression here ... \"]"
    },
    "readVersion": 123630,
    "isolationLevel": "Serializable",
    "isBlindAppend": false,
    "operationMetrics": {
      "numRemovedFiles": "168",
      "numRemovedBytes": "16143805926",
      "numCopiedRows": "89096295",
      "numDeletionVectorsAdded": "0",
      "numDeletionVectorsRemoved": "0",
      "numAddedChangeFiles": "0",
      "executionTimeMs": "299904",
      "numDeletionVectorsUpdated": "0",
      "numDeletedRows": "7435",
      "scanTimeMs": "234196",
      "numAddedFiles": "168",
      "numAddedBytes": "16142503993",
      "rewriteTimeMs": "65707"
    },
    "engineInfo": "Apache-Spark/3.5.0 Delta-Lake/3.0.0",
    "txnId": "f1045f63-1040-4ad3-9630-de81b0d79e36"
  }
}

Steps to reproduce

  1. Build a horribly large filter expression, perhaps by concatenating a horribly large list of values: (tenant#75 = some_tenant) AND some_id#83 INSET lots, of, comma, separated, ids, ...
  2. Delete all matching records to generate a new transaction with an impossibly large predicate string.
  3. Try to operate on Delta table using Spark 3.5/Jackson 2.15.

Observed results

2024/01/23 17:49:56.926 ERROR spark.sql.delta.util.NonFateSharingFuture: Failed to get result from future
scala.runtime.NonLocalReturnControl: null
2024/01/23 17:49:57.261 WARN spark.sql.catalyst.util.SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2024/01/23 17:49:58.346 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 12.0 in stage 15.0 (TID 3106) (10.0.50.17 executor 2): org.apache.spark.SparkException: Encountered error while reading file s3a://<Delta Table S3 Path>/_delta_log/00000000000000123617.json. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:863)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [null,null,null,null,null,null,null,null,null,null].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$9(JsonDataSource.scala:143)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
	... 20 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('a' (code 97)): was expecting comma to separate Object entries
 at [Source: (byte[])"{"commitInfo":{"timestamp":1706018230213,"operation":"DELETE","operationParameters":{"predicate":"[\"<Part of Horribly Large Filter Expression>"[truncated 30381161 bytes]; line: 1, column: 20051015]
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:594)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$7(JsonDataSource.scala:139)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 26 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('a' (code 97)): was expecting comma to separate Object entries
 at [Source: (byte[])"{"commitInfo":{"timestamp":1706018230213,"operation":"DELETE","operationParameters":{"predicate":"[\"<Part of Horribly Large Filter Expression"[truncated 30381161 bytes]; line: 1, column: 20051015]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:750)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:674)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:779)
	at org.apache.spark.sql.catalyst.json.JacksonUtils$.nextUntil(JacksonUtils.scala:32)
	at org.apache.spark.sql.catalyst.json.JacksonParser.org$apache$spark$sql$catalyst$json$JacksonParser$$convertObject(JacksonParser.scala:453)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:123)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:122)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:404)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeStructRootConverter$3(JacksonParser.scala:122)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$parse$2(JacksonParser.scala:582)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource(SparkErrorUtils.scala:48)
	at org.apache.spark.util.SparkErrorUtils.tryWithResource$(SparkErrorUtils.scala:46)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:94)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:577)
	... 28 more

Expected results

Delta table is silently loaded and job proceeds on its own.

Further details

Jackson 2.15 introduced a way to change the default limits, and this is also available in Spark 3.5, but it requires explicitly integrating with those versions of Spark/Jackson.

It's already an action on our side to avoid the use of such long filter expressions 😓. That said, upgrading to latest Spark/Jackson will bring this lingering issue that can affect others, and it may leave limited options for recovery as the upgrade may be distanced from the surfacing of the issue.

In our case, we upgraded this particular job over a month ago, and only now were hit with the issue. Looking back, our horrible filter expressions managed to say just under the 20MB limit until the latest one blew up to 30MB.

We were able to fix our job by reverting to Spark 3.4 / Jackson 2.14 / Delta 2.4. We're looking into what can be done in Spark/Delta to avoid the issue or make the string length configurable when operating on Delta tables with such large predicates in transaction entries.

Environment information

  • Delta Lake version: 3.0.0
  • Spark version: 3.5.0
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@biochimia biochimia added the bug Something isn't working label Jan 24, 2024
@felipepessoto
Copy link
Contributor

We may need a fix similar to #2514

@orsagiv94
Copy link

Has anyone managed to overcome this issue?
It seems like none of the suggested options(PERMISSIVE,DROP_MALFORMED) are allowing us to overcome delta log files larger than 20mb

@lepiaf
Copy link

lepiaf commented Jul 17, 2024

Hello, we have the same issue. Do we know in which release will be this fix ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants