Skip to content

Commit

Permalink
[SPARK-18772][SQL] Avoid unnecessary conversion try for special float…
Browse files Browse the repository at this point in the history
…s in JSON

## What changes were proposed in this pull request?

This PR is based on  apache#16199 and extracts the valid change from apache#9759 to resolve SPARK-18772

This avoids additional conversion try with `toFloat` and `toDouble`.

For avoiding additional conversions, please refer the codes below:

**Before**

```scala
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> spark.read.schema(StructType(Seq(StructField("a", DoubleType)))).option("mode", "FAILFAST").json(Seq("""{"a": "nan"}""").toDS).show()
17/05/12 11:30:41 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.NumberFormatException: For input string: "nan"
...
```

**After**

```scala
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> spark.read.schema(StructType(Seq(StructField("a", DoubleType)))).option("mode", "FAILFAST").json(Seq("""{"a": "nan"}""").toDS).show()
17/05/12 11:44:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Cannot parse nan as DoubleType.
...
```

## How was this patch tested?

Unit tests added in `JsonSuite`.

Closes apache#16199

Author: hyukjinkwon <gurwls223@gmail.com>
Author: Nathan Howell <nhowell@godaddy.com>

Closes apache#17956 from HyukjinKwon/SPARK-18772.
  • Loading branch information
HyukjinKwon authored and liyichao committed May 24, 2017
1 parent dce2aaf commit 6729b75
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.json

import java.io.ByteArrayOutputStream
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
import scala.util.Try
Expand Down Expand Up @@ -126,16 +125,11 @@ class JacksonParser(

case VALUE_STRING =>
// Special case handling for NaN and Infinity.
val value = parser.getText
val lowerCaseValue = value.toLowerCase(Locale.ROOT)
if (lowerCaseValue.equals("nan") ||
lowerCaseValue.equals("infinity") ||
lowerCaseValue.equals("-infinity") ||
lowerCaseValue.equals("inf") ||
lowerCaseValue.equals("-inf")) {
value.toFloat
} else {
throw new RuntimeException(s"Cannot parse $value as FloatType.")
parser.getText match {
case "NaN" => Float.NaN
case "Infinity" => Float.PositiveInfinity
case "-Infinity" => Float.NegativeInfinity
case other => throw new RuntimeException(s"Cannot parse $other as FloatType.")
}
}

Expand All @@ -146,16 +140,11 @@ class JacksonParser(

case VALUE_STRING =>
// Special case handling for NaN and Infinity.
val value = parser.getText
val lowerCaseValue = value.toLowerCase(Locale.ROOT)
if (lowerCaseValue.equals("nan") ||
lowerCaseValue.equals("infinity") ||
lowerCaseValue.equals("-infinity") ||
lowerCaseValue.equals("inf") ||
lowerCaseValue.equals("-inf")) {
value.toDouble
} else {
throw new RuntimeException(s"Cannot parse $value as DoubleType.")
parser.getText match {
case "NaN" => Double.NaN
case "Infinity" => Double.PositiveInfinity
case "-Infinity" => Double.NegativeInfinity
case other => throw new RuntimeException(s"Cannot parse $other as DoubleType.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util.Locale

import com.fasterxml.jackson.core.JsonFactory
import org.apache.hadoop.fs.{Path, PathFilter}
Expand Down Expand Up @@ -1988,4 +1989,43 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
}
}

test("SPARK-18772: Parse special floats correctly") {
val jsons = Seq(
"""{"a": "NaN"}""",
"""{"a": "Infinity"}""",
"""{"a": "-Infinity"}""")

// positive cases
val checks: Seq[Double => Boolean] = Seq(
_.isNaN,
_.isPosInfinity,
_.isNegInfinity)

Seq(FloatType, DoubleType).foreach { dt =>
jsons.zip(checks).foreach { case (json, check) =>
val ds = spark.read
.schema(StructType(Seq(StructField("a", dt))))
.json(Seq(json).toDS())
.select($"a".cast(DoubleType)).as[Double]
assert(check(ds.first()))
}
}

// negative cases
Seq(FloatType, DoubleType).foreach { dt =>
val lowerCasedJsons = jsons.map(_.toLowerCase(Locale.ROOT))
// The special floats are case-sensitive so these cases below throw exceptions.
lowerCasedJsons.foreach { lowerCasedJson =>
val e = intercept[SparkException] {
spark.read
.option("mode", "FAILFAST")
.schema(StructType(Seq(StructField("a", dt))))
.json(Seq(lowerCasedJson).toDS())
.collect()
}
assert(e.getMessage.contains("Cannot parse"))
}
}
}
}

0 comments on commit 6729b75

Please sign in to comment.