Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removal of '#nullable disable' from the Libplanet.Net project (swarm) #3670

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@ To be released.
- Removed the '#nullable disable' from the Libplanet.Store project. [[#3644]]
- Removed the '#nullable disable' from the Libplanet.RocksDBStore project.
[[#3651]]
- (Libplanet.Net) Changed swarm-related types due to removal of
'nullable keyword'. [[#3670]]
- Changed `options` parameter type of `Swarm` class constructor from
`SwarmOptions` to `SwarmOptions?`.
- Changed `consensusTransport` parameter type of `Swarm` class constructor
from `ITransport` to `ITransport?`.
- Changed `Swarm.Validators` property type
from `IReadOnlyList<BoundPeer>` to `IReadOnlyList<BoundPeer>?`.
- Changed `progress` parameter type of `Swarm.PreloadAsync` method
from `IProgress<BlockSyncState>` to `IProgress<BlockSyncState>?`.
- Changed return type of `Swarm.FindSpecificPeerAsync` method
from `Task<BoundPeer>` to `Task<BoundPeer?>`.

### Backward-incompatible network protocol changes

Expand Down Expand Up @@ -45,6 +57,7 @@ To be released.
[#3644]: https://github.com/planetarium/libplanet/pull/3644
[#3651]: https://github.com/planetarium/libplanet/pull/3651
[#3669]: https://github.com/planetarium/libplanet/pull/3669
[#3670]: https://github.com/planetarium/libplanet/pull/3670


Version 4.0.4
Expand Down
15 changes: 7 additions & 8 deletions Libplanet.Net/Swarm.BlockCandidate.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#nullable disable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand All @@ -19,7 +18,7 @@ public partial class Swarm
private async Task ConsumeBlockCandidates(
TimeSpan? checkInterval = null,
bool render = true,
IProgress<BlockSyncState> progress = null,
IProgress<BlockSyncState>? progress = null,
CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -63,10 +62,10 @@ private async Task ConsumeBlockCandidates(
private bool BlockCandidateProcess(
Branch candidate,
bool render,
IProgress<BlockSyncState> progress,
IProgress<BlockSyncState>? progress,
CancellationToken cancellationToken)
{
BlockChain synced = null;
BlockChain? synced = null;
System.Action renderSwap = () => { };
try
{
Expand Down Expand Up @@ -99,7 +98,7 @@ private bool BlockCandidateProcess(
}

if (synced is { } syncedB
&& !syncedB.Id.Equals(BlockChain?.Id)
&& !syncedB.Id.Equals(BlockChain.Id)
&& BlockChain.Tip.Index < syncedB.Tip.Index)
{
_logger.Debug(
Expand All @@ -126,15 +125,15 @@ private BlockChain AppendPreviousBlocks(
BlockChain blockChain,
Branch candidate,
bool render,
IProgress<BlockSyncState> progress)
IProgress<BlockSyncState>? progress)
{
BlockChain workspace = blockChain;
List<Guid> scope = new List<Guid>();
bool forked = false;

Block oldTip = workspace.Tip;
Block newTip = candidate.Blocks.Last().Item1;
List<(Block, BlockCommit)> blocks = candidate.Blocks.ToList();
List<(Block, BlockCommit?)> blocks = candidate.Blocks.ToList();
Block branchpoint = FindBranchpoint(
oldTip,
newTip,
Expand Down Expand Up @@ -429,7 +428,7 @@ private async Task<bool> BlockCandidateDownload(
return false;
}

IAsyncEnumerable<(Block, BlockCommit)> blocksAsync = GetBlocksAsync(
IAsyncEnumerable<(Block, BlockCommit?)> blocksAsync = GetBlocksAsync(
peer,
hashes.Select(pair => pair.Item2),
cancellationToken);
Expand Down
19 changes: 9 additions & 10 deletions Libplanet.Net/Swarm.BlockSync.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#nullable disable
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -53,7 +52,7 @@ internal async Task PullBlocksAsync(
TimeSpan? timeout,
int maximumPollPeers,
int chunkSize,
IProgress<BlockSyncState> progress,
IProgress<BlockSyncState>? progress,
CancellationToken cancellationToken)
{
if (maximumPollPeers <= 0)
Expand All @@ -72,7 +71,7 @@ await GetPeersWithExcerpts(
private async Task PullBlocksAsync(
List<(BoundPeer, IBlockExcerpt)> peersWithBlockExcerpt,
int chunkSize,
IProgress<BlockSyncState> progress,
IProgress<BlockSyncState>? progress,
CancellationToken cancellationToken)
{
if (!peersWithBlockExcerpt.Any())
Expand All @@ -84,7 +83,7 @@ private async Task PullBlocksAsync(
long totalBlocksToDownload = 0L;
long receivedBlockCount = 0L;
Block tempTip = BlockChain.Tip;
var blocks = new List<(Block, BlockCommit)>();
var blocks = new List<(Block, BlockCommit?)>();

try
{
Expand Down Expand Up @@ -142,15 +141,15 @@ private async Task PullBlocksAsync(
return;
}

IAsyncEnumerable<Tuple<Block, BlockCommit, BoundPeer>> completedBlocks =
IAsyncEnumerable<Tuple<Block, BlockCommit?, BoundPeer>> completedBlocks =
blockCompletion.Complete(
peers: peersWithBlockExcerpt.Select(pair => pair.Item1).ToList(),
blockFetcher: GetBlocksAsync,
cancellationToken: cancellationToken
);

await foreach (
(Block block, BlockCommit commit, BoundPeer sourcePeer)
(Block block, BlockCommit? commit, BoundPeer sourcePeer)
in completedBlocks.WithCancellation(cancellationToken))
{
_logger.Verbose(
Expand Down Expand Up @@ -208,13 +207,13 @@ in completedBlocks.WithCancellation(cancellationToken))
}

BlockHash? previousHash = blocks.First().Item1.PreviousHash;
Block branchpoint;
BlockCommit branchpointCommit;
Block? branchpoint;
BlockCommit? branchpointCommit;
if (previousHash != null)
{
branchpoint = BlockChain.Store.GetBlock(
(BlockHash)previousHash);
branchpointCommit = BlockChain.GetBlockCommit(branchpoint.Hash);
branchpointCommit = BlockChain.GetBlockCommit(branchpoint!.Hash);
}
else
{
Expand Down Expand Up @@ -321,7 +320,7 @@ await PullBlocksAsync(
}
}

private void OnBlockChainTipChanged(object sender, (Block OldTip, Block NewTip) e)
private void OnBlockChainTipChanged(object? sender, (Block OldTip, Block NewTip) e)
{
if (Running)
{
Expand Down
1 change: 0 additions & 1 deletion Libplanet.Net/Swarm.MessageHandlers.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#nullable disable
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down
52 changes: 26 additions & 26 deletions Libplanet.Net/Swarm.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#nullable disable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand Down Expand Up @@ -39,9 +38,9 @@ public partial class Swarm : IDisposable

private readonly ILogger _logger;
private readonly IStore _store;
private readonly ConsensusReactor _consensusReactor;
private readonly ConsensusReactor? _consensusReactor;

private CancellationTokenSource _workerCancellationTokenSource;
private CancellationTokenSource? _workerCancellationTokenSource;
private CancellationToken _cancellationToken;

private bool _disposed;
Expand All @@ -66,8 +65,8 @@ public Swarm(
BlockChain blockChain,
PrivateKey privateKey,
ITransport transport,
SwarmOptions options = null,
ITransport consensusTransport = null,
SwarmOptions? options = null,
ITransport? consensusTransport = null,
ConsensusReactorOption? consensusOption = null)
{
BlockChain = blockChain ?? throw new ArgumentNullException(nameof(blockChain));
Expand Down Expand Up @@ -95,7 +94,7 @@ public Swarm(
// code, the portion initializing the swarm in Agent.cs in NineChronicles should be
// fixed. for context, refer to
// https://github.com/planetarium/libplanet/discussions/2303.
Transport = transport;
Transport = transport ?? throw new ArgumentNullException(nameof(transport));
_processBlockDemandSessions = new ConcurrentDictionary<BoundPeer, int>();
Transport.ProcessMessageHandler.Register(ProcessMessageHandlerAsync);
PeerDiscovery = new KademliaProtocol(RoutingTable, Transport, Address);
Expand Down Expand Up @@ -138,11 +137,11 @@ public Swarm(

public bool ConsensusRunning => _consensusReactor?.Running ?? false;

public DnsEndPoint EndPoint => AsPeer is BoundPeer boundPeer ? boundPeer.EndPoint : null;
public DnsEndPoint EndPoint => AsPeer.EndPoint;

public Address Address => _privateKey.Address;

public BoundPeer AsPeer => Transport?.AsPeer;
public BoundPeer AsPeer => Transport.AsPeer;

/// <summary>
/// The last time when any message was arrived.
Expand All @@ -159,7 +158,7 @@ public Swarm(
/// Returns list of the validators that consensus has in its routing table.
/// If the node is not joining consensus, returns <c>null</c>.
/// </summary>
public IReadOnlyList<BoundPeer> Validators => _consensusReactor?.Validators;
public IReadOnlyList<BoundPeer>? Validators => _consensusReactor?.Validators;

/// <summary>
/// The <see cref="BlockChain"/> instance this <see cref="Swarm"/> instance
Expand All @@ -183,7 +182,7 @@ public Swarm(

internal TxCompletion<BoundPeer> TxCompletion { get; }

internal AsyncAutoResetEvent TxReceived => TxCompletion?.TxReceived;
internal AsyncAutoResetEvent TxReceived => TxCompletion.TxReceived;

internal AsyncAutoResetEvent BlockHeaderReceived { get; }

Expand All @@ -200,23 +199,24 @@ public Swarm(
internal SwarmOptions Options { get; }

// FIXME: This should be exposed in a better way.
internal ConsensusReactor ConsensusReactor => _consensusReactor;
internal ConsensusReactor ConsensusReactor => _consensusReactor ??
throw new InvalidOperationException();

/// <summary>
/// Waits until this <see cref="Swarm"/> instance gets started to run.
/// </summary>
/// <seealso cref="ITransport.WaitForRunningAsync()"/>
/// <returns>A <see cref="Task"/> completed when <see cref="ITransport.Running"/>
/// property becomes <see langword="true"/>.</returns>
public Task WaitForRunningAsync() => Transport?.WaitForRunningAsync();
public Task WaitForRunningAsync() => Transport.WaitForRunningAsync();

public void Dispose()
{
if (!_disposed)
{
_workerCancellationTokenSource?.Cancel();
TxCompletion?.Dispose();
Transport?.Dispose();
Transport.Dispose();
_consensusReactor?.Dispose();
_workerCancellationTokenSource?.Dispose();
_disposed = true;
Expand Down Expand Up @@ -516,7 +516,7 @@ CancellationToken cancellationToken
/// <exception cref="AggregateException">Thrown when the given the block downloading is
/// failed.</exception>
public async Task PreloadAsync(
IProgress<BlockSyncState> progress = null,
IProgress<BlockSyncState>? progress = null,
CancellationToken cancellationToken = default)
{
await PreloadAsync(
Expand Down Expand Up @@ -556,7 +556,7 @@ await PreloadAsync(
public async Task PreloadAsync(
TimeSpan? dialTimeout,
long tipDeltaThreshold,
IProgress<BlockSyncState> progress = null,
IProgress<BlockSyncState>? progress = null,
CancellationToken cancellationToken = default)
{
using CancellationTokenRegistration ctr = cancellationToken.Register(() =>
Expand Down Expand Up @@ -663,7 +663,7 @@ await ConsumeBlockCandidates(
/// A <see cref="BoundPeer"/> with <see cref="Address"/> of <paramref name="target"/>.
/// Returns <see langword="null"/> if the peer with address does not exist.
/// </returns>
public async Task<BoundPeer> FindSpecificPeerAsync(
public async Task<BoundPeer?> FindSpecificPeerAsync(
Address target,
int depth = 3,
TimeSpan? timeout = null,
Expand Down Expand Up @@ -805,7 +805,7 @@ internal async IAsyncEnumerable<Tuple<long, BlockHash>> GetBlockHashes(
throw new InvalidMessageContentException(errorMessage, parsedMessage.Content);
}

internal async IAsyncEnumerable<(Block, BlockCommit)> GetBlocksAsync(
internal async IAsyncEnumerable<(Block, BlockCommit?)> GetBlocksAsync(
BoundPeer peer,
IEnumerable<BlockHash> blockHashes,
[EnumeratorCancellation] CancellationToken cancellationToken
Expand Down Expand Up @@ -870,7 +870,7 @@ [EnumeratorCancellation] CancellationToken cancellationToken
cancellationToken.ThrowIfCancellationRequested();
Block block = BlockMarshaler.UnmarshalBlock(
(Bencodex.Types.Dictionary)Codec.Decode(blockPayload));
BlockCommit commit = commitPayload.Length == 0
BlockCommit? commit = commitPayload.Length == 0
? null
: new BlockCommit(Codec.Decode(commitPayload));

Expand Down Expand Up @@ -985,7 +985,7 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(
BlockChain blockChain,
IList<(BoundPeer, IBlockExcerpt)> peersWithExcerpts,
int chunkSize = int.MaxValue,
IProgress<BlockSyncState> progress = null,
IProgress<BlockSyncState>? progress = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
Expand All @@ -1008,7 +1008,7 @@ internal async IAsyncEnumerable<Transaction> GetTxsAsync(
int totalBlockHashesToDownload = -1;
int chunkBlockHashesToDownload = -1;
var pairsToYield = new List<Tuple<long, BlockHash>>();
Exception error = null;
Exception? error = null;
try
{
var downloaded = new List<BlockHash>();
Expand Down Expand Up @@ -1181,7 +1181,7 @@ private void BroadcastBlock(Address? except, Block block)
BroadcastMessage(except, message);
}

private void BroadcastTxs(BoundPeer except, IEnumerable<Transaction> txs)
private void BroadcastTxs(BoundPeer? except, IEnumerable<Transaction> txs)
{
List<TxId> txIds = txs.Select(tx => tx.Id).ToList();
_logger.Information("Broadcasting {Count} txIds...", txIds.Count);
Expand Down Expand Up @@ -1221,7 +1221,7 @@ private void BroadcastMessage(Address? except, MessageContent message)
pair => pair.Item2 is { } chainStatus &&
genesisHash.Equals(chainStatus.GenesisHash) &&
chainStatus.TipIndex > tip.Index)
.Select(pair => (pair.Item1, (IBlockExcerpt)pair.Item2))
.Select(pair => (pair.Item1, (IBlockExcerpt)pair.Item2!))
.OrderByDescending(pair => pair.Item2.Index)
.ToList();
}
Expand All @@ -1241,7 +1241,7 @@ private void BroadcastMessage(Address? except, MessageContent message)
/// of <see cref="BoundPeer"/> and <see cref="ChainStatusMsg"/> where
/// <see cref="ChainStatusMsg"/> can be <see langword="null"/> if dialing fails for
/// a selected <see cref="BoundPeer"/>.</returns>
private Task<(BoundPeer, ChainStatusMsg)[]> DialExistingPeers(
private Task<(BoundPeer, ChainStatusMsg?)[]> DialExistingPeers(
TimeSpan? dialTimeout,
int maxPeersToDial,
CancellationToken cancellationToken)
Expand All @@ -1268,15 +1268,15 @@ void LogException(BoundPeer peer, Task<Message> task)
}

var rnd = new System.Random();
IEnumerable<Task<(BoundPeer, ChainStatusMsg)>> tasks = Peers.OrderBy(_ => rnd.Next())
IEnumerable<Task<(BoundPeer, ChainStatusMsg?)>> tasks = Peers.OrderBy(_ => rnd.Next())
.Take(maxPeersToDial)
.Select(
peer => Transport.SendMessageAsync(
peer,
new GetChainStatusMsg(),
dialTimeout,
cancellationToken
).ContinueWith<(BoundPeer, ChainStatusMsg)>(
).ContinueWith<(BoundPeer, ChainStatusMsg?)>(
task =>
{
if (task.IsFaulted || task.IsCanceled ||
Expand All @@ -1300,7 +1300,7 @@ void LogException(BoundPeer peer, Task<Message> task)
{
if (task.IsFaulted)
{
throw task.Exception;
throw task.Exception!;
}

return task.Result.ToArray();
Expand Down
Loading