Skip to content

Commit

Permalink
[SPARK-45351][CORE] Change spark.shuffle.service.db.backend default…
Browse files Browse the repository at this point in the history
… value to ROCKSDB

### What changes were proposed in this pull request?
Because spark will remove LevelDB in the future. So this PR change `spark.shuffle.service.db.backend` default value to ROCKSDB, not LEVELDB.

### Why are the changes needed?
Prepare remove leveldb.

### Does this PR introduce _any_ user-facing change?
Yes, RocksDB will as default value of `spark.shuffle.service.db.backend`

### How was this patch tested?
exist test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43142 from Hisoka-X/SPARK-45351-rocksdb-as-default-suffledb.

Authored-by: Jia Fan <fanjiaeminem@qq.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
Hisoka-X authored and LuciferYang committed Oct 4, 2023
1 parent ee2eeb7 commit 9830901
Show file tree
Hide file tree
Showing 16 changed files with 35 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ static boolean startsWith(byte[] key, byte[] prefix) {

private boolean isEndMarker(byte[] key) {
return (key.length > 2 &&
key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR &&
key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]);
key[key.length - 2] == RocksDBTypeInfo.KEY_SEPARATOR &&
key[key.length - 1] == RocksDBTypeInfo.END_MARKER[0]);
}

static int compare(byte[] a, byte[] b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public class ExternalShuffleBlockResolver {
private static final ObjectMapper mapper = new ObjectMapper();

/**
* This a common prefix to the key for each app registration we stick in leveldb, so they
* are easy to find, since leveldb lets you search based on prefix.
* This a common prefix to the key for each app registration we stick in RocksDB, so they
* are easy to find, since RocksDB lets you search based on prefix.
*/
private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
Expand Down Expand Up @@ -126,7 +126,7 @@ public ShuffleIndexInformation load(String filePath) throws IOException {
(filePath, indexInfo) -> indexInfo.getRetainedMemorySize())
.build(indexCacheLoader);
String dbBackendName =
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name());
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.ROCKSDB.name());
DBBackend dbBackend = DBBackend.byName(dbBackendName);
db = DBProvider.initDB(dbBackend, this.registeredExecutorFile, CURRENT_VERSION, mapper);
if (db != null) {
Expand Down Expand Up @@ -350,7 +350,7 @@ void close() {
try {
db.close();
} catch (IOException e) {
logger.error("Exception closing leveldb with registered executors", e);
logger.error("Exception closing RocksDB with registered executors", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public ShuffleIndexInformation load(String filePath) throws IOException {
.build(indexCacheLoader);
this.recoveryFile = recoveryFile;
String dbBackendName =
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name());
conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.ROCKSDB.name());
DBBackend dbBackend = DBBackend.byName(dbBackendName);
db = DBProvider.initDB(dbBackend, this.recoveryFile, CURRENT_VERSION, mapper);
if (db != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ protected void serviceInit(Configuration externalConf) throws Exception {

if (_recoveryPath != null) {
String dbBackendName = _conf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND,
DBBackend.LEVELDB.name());
DBBackend.ROCKSDB.name());
dbBackend = DBBackend.byName(dbBackendName);
logger.info("Use {} as the implementation of {}",
dbBackend, Constants.SHUFFLE_SERVICE_DB_BACKEND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.

// If the hybrid store is enabled, try it first and fail back to leveldb store.
// If the hybrid store is enabled, try it first and fail back to RocksDB store.
if (hybridStoreEnabled) {
try {
return createHybridStore(dm, appId, attempt, metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.History.HybridStoreDiskBackend.LEVELDB
import org.apache.spark.internal.config.History.HybridStoreDiskBackend.ROCKSDB
import org.apache.spark.status.KVUtils
import org.apache.spark.status.KVUtils._
import org.apache.spark.util.{Clock, Utils}
Expand Down Expand Up @@ -57,7 +57,7 @@ private class HistoryServerDiskManager(
throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).")
}
private val extension =
if (conf.get(HYBRID_STORE_DISK_BACKEND) == LEVELDB.toString) ".ldb" else ".rdb"
if (conf.get(HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb"

private val tmpStoreDir = new File(path, "temp")
if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ private[history] class HybridStore extends KVStore {

private var diskStore: KVStore = null

// Flag to indicate whether we should use inMemoryStore or levelDB
// Flag to indicate whether we should use inMemoryStore or RocksDB
private val shouldUseInMemoryStore = new AtomicBoolean(true)

// Flag to indicate whether this hybrid store is closed, use this flag
// to avoid starting background thread after the store is closed
private val closed = new AtomicBoolean(false)

// A background thread that dumps data from inMemoryStore to levelDB
// A background thread that dumps data from inMemoryStore to RocksDB
private var backgroundThread: Thread = null

// A hash map that stores all classes that had been written to inMemoryStore
Expand Down Expand Up @@ -80,7 +80,7 @@ private[history] class HybridStore extends KVStore {
override def delete(klass: Class[_], naturalKey: Object): Unit = {
if (backgroundThread != null) {
throw new IllegalStateException("delete() shouldn't be called after " +
"the hybrid store begins switching to levelDB")
"the hybrid store begins switching to RocksDB")
}

getStore().delete(klass, naturalKey)
Expand Down Expand Up @@ -119,7 +119,7 @@ private[history] class HybridStore extends KVStore {
indexValues: Collection[_]): Boolean = {
if (backgroundThread != null) {
throw new IllegalStateException("removeAllByIndexValues() shouldn't be " +
"called after the hybrid store begins switching to levelDB")
"called after the hybrid store begins switching to RocksDB")
}

getStore().removeAllByIndexValues(klass, index, indexValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ package object config {
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(DBBackend.values.map(_.toString).toSet)
.createWithDefault(DBBackend.LEVELDB.name)
.createWithDefault(DBBackend.ROCKSDB.name)

private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,8 @@ object HistoryServerSuite {
// generate the "expected" results for the characterization tests. Just blindly assume the
// current behavior is correct, and write out the returned json to the test/resource files

// SPARK-38851: Use LevelDB backend because it is the default.
val suite = new LevelDBBackendHistoryServerSuite
// Use RocksDB backend because it is the default.
val suite = new RocksDBBackendHistoryServerSuite
FileUtils.deleteDirectory(suite.getExpRoot)
suite.getExpRoot.mkdirs()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with T
store.setMetadata(t1)
assert(store.getMetadata(classOf[CustomType1]) === t1)

// Switch to LevelDB and set a new metadata
// Switch to RocksDB/LevelDB and set a new metadata
switchHybridStore(store)

val t2 = createCustomType1(2)
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.shuffle.service.db.backend</code></td>
<td>LEVELDB</td>
<td>ROCKSDB</td>
<td>
Specifies a disk-based store used in shuffle service local db. Setting as LEVELDB or ROCKSDB.
</td>
Expand Down
2 changes: 2 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ license: |

- Since Spark 4.0, Spark will compress event logs. To restore the behavior before Spark 4.0, you can set `spark.eventLog.compress` to `false`.

- Since Spark 4.0, `spark.shuffle.service.db.backend` is set to `ROCKSDB` by default which means Spark will use RocksDB store for shuffle service. To restore the behavior before Spark 4.0, you can set `spark.shuffle.service.db.backend` to `LEVELDB`.

- In Spark 4.0, support for Apache Mesos as a resource manager was removed.

## Upgrading from Core 3.3 to 3.4
Expand Down
4 changes: 2 additions & 2 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -914,10 +914,10 @@ The following extra configuration options are available when the shuffle service
</tr>
<tr>
<td><code>spark.shuffle.service.db.backend</code></td>
<td>LEVELDB</td>
<td>ROCKSDB</td>
<td>
When work-preserving restart is enabled in YARN, this is used to specify the disk-base store used
in shuffle service state store, supports `LEVELDB` and `ROCKSDB` with `LEVELDB` as default value.
in shuffle service state store, supports `LEVELDB` and `ROCKSDB` with `ROCKSDB` as default value.
The original data store in `LevelDB/RocksDB` will not be automatically converted to another kind
of storage now. The original data store will be retained and the new type data store will be
created when switching storage types.
Expand Down
4 changes: 2 additions & 2 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ SPARK_WORKER_OPTS supports the following system properties:
</tr>
<tr>
<td><code>spark.shuffle.service.db.backend</code></td>
<td>LEVELDB</td>
<td>ROCKSDB</td>
<td>
When <code>spark.shuffle.service.db.enabled</code> is true, user can use this to specify the kind of disk-based
store used in shuffle service state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value.
store used in shuffle service state store. This supports `LEVELDB` and `ROCKSDB` now and `ROCKSDB` as default value.
The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now.
</td>
<td>3.4.0</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ object ShuffleTestAccessor {
resolver.db
}

def mergeManagerLevelDB(mergeManager: RemoteBlockPushResolver): DB = {
def mergeManagerDB(mergeManager: RemoteBlockPushResolver): DB = {
mergeManager.db
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {

val blockResolverDB = ShuffleTestAccessor.shuffleServiceDB(blockResolver)
ShuffleTestAccessor.reloadRegisteredExecutors(blockResolverDB) should not be empty
val mergeManagerDB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager)
val mergeManagerDB = ShuffleTestAccessor.mergeManagerDB(mergeManager)
ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager, mergeManagerDB) should not be empty

s1.stopApplication(new ApplicationTerminationContext(app1Id))
Expand Down Expand Up @@ -613,7 +613,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
val appPathsInfo3NoAttempt = new AppPathsInfo(localDirs3NoAttempt, 4)

val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
val mergeManager1DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
val mergeManager1DB = ShuffleTestAccessor.mergeManagerDB(mergeManager1)
ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)

ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
Expand Down Expand Up @@ -722,7 +722,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)

val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
val mergeManager1DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
val mergeManager1DB = ShuffleTestAccessor.mergeManagerDB(mergeManager1)
ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)

ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
Expand Down Expand Up @@ -833,7 +833,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)

val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
val mergeManager1DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
val mergeManager1DB = ShuffleTestAccessor.mergeManagerDB(mergeManager1)
ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)

mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
Expand Down Expand Up @@ -958,7 +958,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
s2 = createYarnShuffleServiceWithCustomMergeManager(
ShuffleTestAccessor.createMergeManagerWithNoCleanupAfterReload)
val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
val mergeManager2DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager2)
val mergeManager2DB = ShuffleTestAccessor.mergeManagerDB(mergeManager2)
ShuffleTestAccessor.clearAppShuffleInfo(mergeManager2)
assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
mergeManager2, mergeManager2DB) == 1)
Expand All @@ -971,7 +971,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
s3.mergeManagerFile should be (mergeMgrFile)

val mergeManager3 = s3.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
val mergeManager3DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager3)
val mergeManager3DB = ShuffleTestAccessor.mergeManagerDB(mergeManager3)
appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager3)
appShuffleInfo.size() equals 1
appShuffleInfo.get(
Expand Down Expand Up @@ -1026,7 +1026,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
ShuffleTestAccessor.createMergeManagerWithNoCleanupAfterReload)

val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
val mergeManager2DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager2)
val mergeManager2DB = ShuffleTestAccessor.mergeManagerDB(mergeManager2)
ShuffleTestAccessor.clearAppShuffleInfo(mergeManager2)
assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
mergeManager2, mergeManager2DB) == 1)
Expand Down Expand Up @@ -1115,7 +1115,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {

test("create remote block push resolver instance") {
val mockConf = mock(classOf[TransportConf])
when(mockConf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.LEVELDB.name()))
when(mockConf.get(Constants.SHUFFLE_SERVICE_DB_BACKEND, DBBackend.ROCKSDB.name()))
.thenReturn(shuffleDBBackend().name())
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
"org.apache.spark.network.shuffle.RemoteBlockPushResolver")
Expand Down

0 comments on commit 9830901

Please sign in to comment.