Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized RocksDBStore.IterateIndexes() for pruned chain storage. #4004

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ To be released.
### Bug fixes

- Fixed a bug in `IStore.PruneOutdatedChains()`. [[#3999]]
- Optimized `IStore.IterateIndexes()`. [[#4004]]

### Dependencies

### CLI tools

[#3997]: https://github.com/planetarium/libplanet/pull/3997
[#3999]: https://github.com/planetarium/libplanet/pull/3999
[#4004]: https://github.com/planetarium/libplanet/pull/4004


Version 5.4.0
Expand Down
2 changes: 1 addition & 1 deletion src/Libplanet.RocksDBStore/RocksDBStore.Prune.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private IEnumerable<BlockHash> IterateIndexesForPrune(Guid chainId)
private IEnumerable<BlockHash> IterateIndexesInnerForPrune(Guid chainId)
{
byte[] prefix = Concat(IndexKeyPrefix, chainId.ToByteArray());
foreach (Iterator it in IterateDb(_chainDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix))
{
byte[] value = it.Value();
yield return new BlockHash(value);
Expand Down
104 changes: 86 additions & 18 deletions src/Libplanet.RocksDBStore/RocksDBStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ public RocksDBStore(
});
}

private bool IsPruned => ListChainIds().Count() <= 1;

public static bool MigrateChainDBFromColumnFamilies(string path)
{
var opt = new DbOptions();
Expand All @@ -310,7 +312,7 @@ public static bool MigrateChainDBFromColumnFamilies(string path)
}

RocksDb db = RocksDb.Open(opt, path, cfs);
if (cfs.Count() == 1 && IterateDb(db, ChainIdKeyPrefix).Any())
if (cfs.Count() == 1 && IterateDbUnpruned(db, ChainIdKeyPrefix).Any())
{
// Already migrated.
db.Dispose();
Expand Down Expand Up @@ -419,7 +421,7 @@ void CopyIndexes(ColumnFamilyHandle cfh, long? limit)
/// <inheritdoc/>
public override IEnumerable<Guid> ListChainIds()
{
foreach (var it in IterateDb(_chainDb, ChainIdKeyPrefix))
foreach (var it in IterateDbUnpruned(_chainDb, ChainIdKeyPrefix))
{
var guid = new Guid(it.Value());
if (IsDeletionMarked(guid) && HasFork(guid))
Expand All @@ -444,7 +446,7 @@ public override void DeleteChainId(Guid chainId)
// FIXME: We should remove this code after adjusting .ForkTxNonces().
using var batch = new WriteBatch();
byte[] prefix = TxNonceKey(chainId);
foreach (Iterator k in IterateDb(_chainDb, prefix))
foreach (Iterator k in IterateDbUnpruned(_chainDb, prefix))
{
batch.Delete(k.Key());
}
Expand All @@ -460,12 +462,12 @@ public override void DeleteChainId(Guid chainId)
try
{
using var batch = new WriteBatch();
foreach (Iterator it in IterateDb(_chainDb, IndexKey(chainId)))
foreach (Iterator it in IterateDbUnpruned(_chainDb, IndexKey(chainId)))
{
batch.Delete(it.Key());
}

foreach (Iterator it in IterateDb(_chainDb, TxNonceKey(chainId)))
foreach (Iterator it in IterateDbUnpruned(_chainDb, TxNonceKey(chainId)))
{
batch.Delete(it.Key());
}
Expand Down Expand Up @@ -664,7 +666,7 @@ BlockHash branchpoint
}

using var batch = new WriteBatch();
foreach (Iterator k in IterateDb(_chainDb, IndexKey(destinationChainId)))
foreach (Iterator k in IterateDbUnpruned(_chainDb, IndexKey(destinationChainId)))
{
batch.Delete(k.Key());
}
Expand Down Expand Up @@ -817,7 +819,7 @@ public override IEnumerable<BlockHash> IterateBlockHashes()
{
byte[] prefix = BlockKeyPrefix;

foreach (Iterator it in IterateDb(_blockIndexDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_blockIndexDb, prefix))
{
byte[] key = it.Key();
byte[] hashBytes = key.Skip(prefix.Length).ToArray();
Expand Down Expand Up @@ -1014,7 +1016,7 @@ public override void DeleteTxIdBlockHashIndex(TxId txId, BlockHash blockHash)
public override IEnumerable<BlockHash> IterateTxIdBlockHashIndex(TxId txId)
{
var prefix = TxIdBlockHashIndexTxIdKey(txId);
foreach (var it in IterateDb(_txIdBlockHashIndexDb, prefix))
foreach (var it in IterateDbUnpruned(_txIdBlockHashIndexDb, prefix))
{
yield return new BlockHash(it.Value());
}
Expand Down Expand Up @@ -1043,7 +1045,7 @@ public override void PutTxExecution(TxExecution txExecution) =>
public override IEnumerable<KeyValuePair<Address, long>> ListTxNonces(Guid chainId)
{
byte[] prefix = TxNonceKey(chainId);
foreach (Iterator it in IterateDb(_chainDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix))
{
byte[] addressBytes = it.Key()
.Skip(prefix.Length)
Expand Down Expand Up @@ -1145,7 +1147,7 @@ public override void ForkTxNonces(Guid sourceChainId, Guid destinationChainId)
try
{
byte[] prefix = TxNonceKey(sourceChainId);
foreach (Iterator it in IterateDb(_chainDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix))
{
exist = true;
Address address = new Address(it.Key().Skip(prefix.Length).ToArray());
Expand Down Expand Up @@ -1296,7 +1298,8 @@ public override IEnumerable<BlockHash> GetBlockCommitHashes()
{
try
{
IEnumerable<Iterator> iterators = IterateDb(_blockCommitDb, Array.Empty<byte>());
IEnumerable<Iterator> iterators = IterateDbUnpruned(
_blockCommitDb, Array.Empty<byte>());

// FIXME: Somehow key value comes with 0x76 prefix at the first index of
// byte array.
Expand Down Expand Up @@ -1396,7 +1399,7 @@ public override void DeleteNextStateRootHash(BlockHash blockHash)
/// <inheritdoc/>
public override IEnumerable<EvidenceId> IteratePendingEvidenceIds()
{
foreach (Iterator it in IterateDb(_pendingEvidenceDb, PendingEvidenceKeyPrefix))
foreach (Iterator it in IterateDbUnpruned(_pendingEvidenceDb, PendingEvidenceKeyPrefix))
{
byte[] key = it.Key();
byte[] idBytes = key.Skip(PendingEvidenceKeyPrefix.Length).ToArray();
Expand Down Expand Up @@ -1756,10 +1759,16 @@ private static byte[] PendingEvidenceKey(in EvidenceId evidenceId) =>
private static byte[] CommittedEvidenceKey(in EvidenceId evidenceId) =>
CommittedEvidenceKeyPrefix.Concat(evidenceId.ByteArray).ToArray();

private static IEnumerable<Iterator> IterateDb(RocksDb db, byte[] prefix)
private static IEnumerable<Iterator> IterateDbUnpruned(
RocksDb db,
byte[] prefix,
byte[]? start = null)
{
using Iterator it = db.NewIterator();
for (it.Seek(prefix); it.Valid() && it.Key().StartsWith(prefix); it.Next())
for (
it.Seek(start is { } s ? s : prefix);
it.Valid() && it.Key().StartsWith(prefix);
it.Next())
{
yield return it;
}
Expand Down Expand Up @@ -1813,6 +1822,48 @@ private void LogUnexpectedException(string methodName, Exception e)
}

private IEnumerable<BlockHash> IterateIndexes(
Guid chainId,
long offset,
long? limit,
bool includeDeleted) => IsPruned
? IterateIndexesPruned(chainId, offset, limit, includeDeleted)
: IterateIndexesUnpruned(chainId, offset, limit, includeDeleted);

private IEnumerable<BlockHash> IterateIndexesPruned(
Guid chainId,
long offset,
long? limit,
bool includeDeleted)
{
if (!includeDeleted && IsDeletionMarked(chainId))
{
yield break;
}

long count = 0;
long limitUpperBound = CountIndex(chainId) - offset;
long actualLimit = limit is { } l
? Math.Min(l, limitUpperBound)
: limitUpperBound;

if (actualLimit <= 0)
{
yield break;
}

foreach (BlockHash hash in IterateIndexesInnerPruned(chainId, offset))
{
yield return hash;
count += 1;

if (count >= actualLimit)
{
yield break;
}
}
}

private IEnumerable<BlockHash> IterateIndexesUnpruned(
Guid chainId,
long offset,
long? limit,
Expand Down Expand Up @@ -1859,7 +1910,7 @@ bool includeDeleted
long expectedCount = chainTipIndex - previousChainTipIndex +
(GetPreviousChainInfo(cid) is null ? 1 : 0);

foreach (BlockHash hash in IterateIndexesInner(cid, expectedCount))
foreach (BlockHash hash in IterateIndexesInnerUnpruned(cid, expectedCount))
{
if (offset > 0)
{
Expand All @@ -1880,11 +1931,28 @@ bool includeDeleted
}
}

private IEnumerable<BlockHash> IterateIndexesInner(Guid chainId, long expectedCount)
private IEnumerable<BlockHash> IterateIndexesInnerPruned(
Guid chainId,
long offset)
{
byte[] start = Concat(
IndexKeyPrefix,
chainId.ToByteArray(),
RocksDBStoreBitConverter.GetBytes(offset));
byte[] prefix = Concat(IndexKeyPrefix, chainId.ToByteArray());

foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix, start))
{
byte[] value = it.Value();
yield return new BlockHash(value);
}
}

private IEnumerable<BlockHash> IterateIndexesInnerUnpruned(Guid chainId, long expectedCount)
{
long count = 0;
byte[] prefix = Concat(IndexKeyPrefix, chainId.ToByteArray());
foreach (Iterator it in IterateDb(_chainDb, prefix))
foreach (Iterator it in IterateDbUnpruned(_chainDb, prefix))
{
if (count >= expectedCount)
{
Expand All @@ -1910,7 +1978,7 @@ private void RemoveFork(Guid chainId, Guid forkedChainId)
private bool HasFork(Guid chainId)
{
byte[] prefix = Concat(ForkedChainsKeyPrefix, chainId.ToByteArray());
return IterateDb(_chainDb, prefix).Any();
return IterateDbUnpruned(_chainDb, prefix).Any();
}

private bool IsDeletionMarked(Guid chainId)
Expand Down
27 changes: 0 additions & 27 deletions test/Libplanet.Tests/Store/StoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,33 +284,6 @@ public void DeleteChainIdWithForksReverse()
Assert.Empty(store.IterateIndexes(chainC));
}

[SkippableFact]
public void ForkFromChainWithDeletion()
{
IStore store = Fx.Store;
Guid chainA = Guid.NewGuid();
Guid chainB = Guid.NewGuid();
Guid chainC = Guid.NewGuid();

// We need `Block<T>`s because `IStore` can't retrieve index(long) by block hash without
// actual block...
store.PutBlock(Fx.GenesisBlock);
store.PutBlock(Fx.Block1);
store.PutBlock(Fx.Block2);
store.PutBlock(Fx.Block3);

store.AppendIndex(chainA, Fx.GenesisBlock.Hash);
store.AppendIndex(chainA, Fx.Block1.Hash);
store.ForkBlockIndexes(chainA, chainB, Fx.Block1.Hash);
store.DeleteChainId(chainA);

store.ForkBlockIndexes(chainB, chainC, Fx.Block1.Hash);
Assert.Equal(
Fx.Block1.Hash,
store.IndexBlockHash(chainC, Fx.Block1.Index)
);
}

[SkippableFact]
public void CanonicalChainId()
{
Expand Down
Loading