Skip to content

Commit

Permalink
[SPARK-49732][CORE][K8S] Spark deamons should respect `spark.log.stru…
Browse files Browse the repository at this point in the history
…cturedLogging.enabled` conf

### What changes were proposed in this pull request?

Explicitly call `Logging.uninitialize()` after `SparkConf` loading `spark-defaults.conf`

### Why are the changes needed?

SPARK-49015 fixes a similar issue that affects services started through `SparkSubmit`, while for other services like SHS, there is still a chance that the logging system is initialized before `SparkConf` constructed, so `spark.log.structuredLogging.enabled` configured at `spark-defaults.conf` won't take effect.

The issue only happens when the logging system is initialized before `SparkConf` loading `spark-defaults.conf`.

[example 1](#47500 (comment)), when `java.net.InetAddress.getLocalHost` returns `127.0.0.1`,

```
scala> java.net.InetAddress.getLocalHost
res0: java.net.InetAddress = H27212-MAC-01.local/127.0.0.1
```

the logging system will be initialized early.
```
{"ts":"2024-09-22T12:50:37.082Z","level":"WARN","msg":"Your hostname, H27212-MAC-01.local, resolves to a loopback address: 127.0.0.1; using 192.168.32.130 instead (on interface en0)","context":{"host":"H27212-MAC-01.local","host_port":"127.0.0.1","host_port2":"192.168.32.130","network_if":"en0"},"logger":"Utils"}
{"ts":"2024-09-22T12:50:37.085Z","level":"WARN","msg":"Set SPARK_LOCAL_IP if you need to bind to another address","logger":"Utils"}
```

example 2: SHS calls `Utils.initDaemon(log)` before loading `spark-defaults.conf`(inside construction of `HistoryServerArguments`)

https://github.com/apache/spark/blob/d2e8c1cb60e34a1c7e92374c07d682aa5ca79145/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala#L301-L302

```
{"ts":"2024-09-22T13:20:31.978Z","level":"INFO","msg":"Started daemon with process name: 41505H27212-MAC-01.local","logger":"HistoryServer"}
{"ts":"2024-09-22T13:20:31.980Z","level":"INFO","msg":"Registering signal handler for TERM","logger":"SignalUtils"}
{"ts":"2024-09-22T13:20:31.981Z","level":"INFO","msg":"Registering signal handler for HUP","logger":"SignalUtils"}
{"ts":"2024-09-22T13:20:31.981Z","level":"INFO","msg":"Registering signal handler for INT","logger":"SignalUtils"}
```
then loads `spark-defaults.conf` and ignores `spark.log.structuredLogging.enabled`.

### Does this PR introduce _any_ user-facing change?

No, spark structured logging is an unreleased feature.

### How was this patch tested?

Write `spark.log.structuredLogging.enabled=false` in `spark-defaults.conf`

4.0.0-preview2
```
$ SPARK_NO_DAEMONIZE=1 sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /Users/chengpan/app/spark-4.0.0-preview2-bin-hadoop3/logs/spark-chengpan-org.apache.spark.deploy.history.HistoryServer-1-H27212-MAC-01.local.out
Spark Command: /Users/chengpan/.sdkman/candidates/java/current/bin/java -cp /Users/chengpan/app/spark-4.0.0-preview2-bin-hadoop3/conf/:/Users/chengpan/app/spark-4.0.0-preview2-bin-hadoop3/jars/slf4j-api-2.0.16.jar:/Users/chengpan/app/spark-4.0.0-preview2-bin-hadoop3/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
{"ts":"2024-09-22T12:50:37.082Z","level":"WARN","msg":"Your hostname, H27212-MAC-01.local, resolves to a loopback address: 127.0.0.1; using 192.168.32.130 instead (on interface en0)","context":{"host":"H27212-MAC-01.local","host_port":"127.0.0.1","host_port2":"192.168.32.130","network_if":"en0"},"logger":"Utils"}
{"ts":"2024-09-22T12:50:37.085Z","level":"WARN","msg":"Set SPARK_LOCAL_IP if you need to bind to another address","logger":"Utils"}
{"ts":"2024-09-22T12:50:37.109Z","level":"INFO","msg":"Started daemon with process name: 37764H27212-MAC-01.local","logger":"HistoryServer"}
{"ts":"2024-09-22T12:50:37.112Z","level":"INFO","msg":"Registering signal handler for TERM","logger":"SignalUtils"}
{"ts":"2024-09-22T12:50:37.112Z","level":"INFO","msg":"Registering signal handler for HUP","logger":"SignalUtils"}
{"ts":"2024-09-22T12:50:37.112Z","level":"INFO","msg":"Registering signal handler for INT","logger":"SignalUtils"}
{"ts":"2024-09-22T12:50:37.258Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}
{"ts":"2024-09-22T12:50:37.275Z","level":"INFO","msg":"Changing view acls to: chengpan","logger":"SecurityManager"}
{"ts":"2024-09-22T12:50:37.275Z","level":"INFO","msg":"Changing modify acls to: chengpan","logger":"SecurityManager"}
{"ts":"2024-09-22T12:50:37.276Z","level":"INFO","msg":"Changing view acls groups to: chengpan","logger":"SecurityManager"}
{"ts":"2024-09-22T12:50:37.276Z","level":"INFO","msg":"Changing modify acls groups to: chengpan","logger":"SecurityManager"}
{"ts":"2024-09-22T12:50:37.277Z","level":"INFO","msg":"SecurityManager: authentication disabled; ui acls disabled; users with view permissions: chengpan groups with view permissions: EMPTY; users with modify permissions: chengpan; groups with modify permissions: EMPTY; RPC SSL disabled","logger":"SecurityManager"}
{"ts":"2024-09-22T12:50:37.309Z","level":"INFO","msg":"History server ui acls disabled; users with admin permissions: ; groups with admin permissions: ","logger":"FsHistoryProvider"}
{"ts":"2024-09-22T12:50:37.409Z","level":"INFO","msg":"Start Jetty 0.0.0.0:18080 for HistoryServerUI","logger":"JettyUtils"}
{"ts":"2024-09-22T12:50:37.466Z","level":"INFO","msg":"Successfully started service 'HistoryServerUI' on port 18080.","logger":"Utils"}
{"ts":"2024-09-22T12:50:37.491Z","level":"INFO","msg":"Bound HistoryServer to 0.0.0.0, and started at http://192.168.32.130:18080","logger":"HistoryServer"}
...
```

This PR
```
$ SPARK_NO_DAEMONIZE=1 sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /Users/chengpan/Projects/apache-spark/dist/logs/spark-chengpan-org.apache.spark.deploy.history.HistoryServer-1-H27212-MAC-01.local.out
Spark Command: /Users/chengpan/.sdkman/candidates/java/current/bin/java -cp /Users/chengpan/Projects/apache-spark/dist/conf/:/Users/chengpan/Projects/apache-spark/dist/jars/slf4j-api-2.0.16.jar:/Users/chengpan/Projects/apache-spark/dist/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
{"ts":"2024-09-22T13:20:31.903Z","level":"WARN","msg":"Your hostname, H27212-MAC-01.local, resolves to a loopback address: 127.0.0.1; using 192.168.32.130 instead (on interface en0)","context":{"host":"H27212-MAC-01.local","host_port":"127.0.0.1","host_port2":"192.168.32.130","network_if":"en0"},"logger":"Utils"}
{"ts":"2024-09-22T13:20:31.905Z","level":"WARN","msg":"Set SPARK_LOCAL_IP if you need to bind to another address","logger":"Utils"}
{"ts":"2024-09-22T13:20:31.978Z","level":"INFO","msg":"Started daemon with process name: 41505H27212-MAC-01.local","logger":"HistoryServer"}
{"ts":"2024-09-22T13:20:31.980Z","level":"INFO","msg":"Registering signal handler for TERM","logger":"SignalUtils"}
{"ts":"2024-09-22T13:20:31.981Z","level":"INFO","msg":"Registering signal handler for HUP","logger":"SignalUtils"}
{"ts":"2024-09-22T13:20:31.981Z","level":"INFO","msg":"Registering signal handler for INT","logger":"SignalUtils"}
{"ts":"2024-09-22T13:20:32.136Z","level":"WARN","msg":"Unable to load native-hadoop library for your platform... using builtin-java classes where applicable","logger":"NativeCodeLoader"}
Using Spark's default log4j profile: org/apache/spark/log4j2-pattern-layout-defaults.properties
24/09/22 21:20:32 INFO SecurityManager: Changing view acls to: chengpan
24/09/22 21:20:32 INFO SecurityManager: Changing modify acls to: chengpan
24/09/22 21:20:32 INFO SecurityManager: Changing view acls groups to: chengpan
24/09/22 21:20:32 INFO SecurityManager: Changing modify acls groups to: chengpan
24/09/22 21:20:32 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: chengpan groups with view permissions: EMPTY; users with modify permissions: chengpan; groups with modify permissions: EMPTY; RPC SSL disabled
24/09/22 21:20:32 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions:
24/09/22 21:20:32 INFO JettyUtils: Start Jetty 0.0.0.0:18080 for HistoryServerUI
24/09/22 21:20:32 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080.
24/09/22 21:20:32 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, and started at http://192.168.32.130:18080
...
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48198 from pan3793/SPARK-49732.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
pan3793 authored and dongjoon-hyun committed Sep 30, 2024
1 parent d68048b commit 1233611
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ object ExternalShuffleService extends Logging {
Utils.initDaemon(log)
val sparkConf = new SparkConf
Utils.loadDefaultSparkProperties(sparkConf)
// Initialize logging system again after `spark.log.structuredLogging.enabled` takes effect
Utils.resetStructuredLogging(sparkConf)
Logging.uninitialize()
val securityManager = new SecurityManager(sparkConf)

// we override this value since this service is started from the command line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ private[spark] class SparkSubmit extends Logging {
} else {
// For non-shell applications, enable structured logging if it's not explicitly disabled
// via the configuration `spark.log.structuredLogging.enabled`.
if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = true)) {
Logging.enableStructuredLogging()
} else {
Logging.disableStructuredLogging()
}
Utils.resetStructuredLogging(sparkConf)
}

// We should initialize log again after `spark.log.structuredLogging.enabled` effected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin

// This mutates the SparkConf, so all accesses to it must be made after this line
Utils.loadDefaultSparkProperties(conf, propertiesFile)
// Initialize logging system again after `spark.log.structuredLogging.enabled` takes effect
Utils.resetStructuredLogging(conf)
Logging.uninitialize()

// scalastyle:off line.size.limit println
private def printUsageAndExit(exitCode: Int, error: String = ""): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) exte

// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
// Initialize logging system again after `spark.log.structuredLogging.enabled` takes effect
Utils.resetStructuredLogging(conf)
Logging.uninitialize()

if (conf.contains(MASTER_UI_PORT.key)) {
webUiPort = conf.get(MASTER_UI_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory
import scala.annotation.tailrec

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Worker._
import org.apache.spark.util.{IntParam, MemoryParam, Utils}

Expand Down Expand Up @@ -59,6 +60,9 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {

// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
// Initialize logging system again after `spark.log.structuredLogging.enabled` takes effect
Utils.resetStructuredLogging(conf)
Logging.uninitialize()

conf.get(WORKER_UI_PORT).foreach { webUiPort = _ }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}

// Initialize logging system again after `spark.log.structuredLogging.enabled` takes effect
Utils.resetStructuredLogging(driverConf)
Logging.uninitialize()

cfg.hadoopDelegationCreds.foreach { tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2673,6 +2673,19 @@ private[spark] object Utils
}
}

/**
* Utility function to enable or disable structured logging based on SparkConf.
* This is designed for a code path which logging system may be initilized before
* loading SparkConf.
*/
def resetStructuredLogging(sparkConf: SparkConf): Unit = {
if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = true)) {
Logging.enableStructuredLogging()
} else {
Logging.disableStructuredLogging()
}
}

/**
* Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
* these jars through file server. In the YARN mode, it will return an empty list, since YARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ private[spark] object KubernetesExecutorBackend extends Logging {
}
}

// Initialize logging system again after `spark.log.structuredLogging.enabled` takes effect
Utils.resetStructuredLogging(driverConf)
Logging.uninitialize()

cfg.hadoopDelegationCreds.foreach { tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}
Expand Down

0 comments on commit 1233611

Please sign in to comment.