Skip to content

Commit

Permalink
Added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
greymistcube committed Nov 25, 2024
1 parent bff5f7f commit 58a4a6b
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 48 deletions.
4 changes: 4 additions & 0 deletions src/Libplanet.Net/Protocols/ReqRepProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public async Task DialAsync(
try
{
_transport.RequestMessageToSend += eventHandler;
_logger.Debug(
"Ready to send a request message to {RemotePeer} as {LocalPeer}",
context.RemotePeer.Address,
context.LocalPeer.Address);
await SendAndReceiveMessage(request, downChannel, context);
}
finally
Expand Down
66 changes: 38 additions & 28 deletions src/Libplanet.Net/Transports/Libp2pTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Dasync.Collections;
using Libplanet.Crypto;
using Libplanet.Net.Messages;
using Libplanet.Net.Options;
Expand All @@ -21,6 +20,8 @@ namespace Libplanet.Net.Transports
public class Libp2pTransport : ITransport
#pragma warning restore S101
{
public const int DialProtocolDelay = 100;

private readonly PrivateKey _privateKey;
private readonly ILogger _logger;
private readonly HostOptions _hostOptions;
Expand All @@ -33,6 +34,7 @@ public class Libp2pTransport : ITransport
private Multiaddress? _listenerAddress = null;

private CancellationTokenSource _runtimeCancellationTokenSource;
private bool _running = false;
private bool _disposed = false;

public Libp2pTransport(
Expand Down Expand Up @@ -81,7 +83,7 @@ public Libp2pTransport(

public DateTimeOffset? LastMessageTimestamp { get; }

public bool Running { get; }
public bool Running => _running;

public AppProtocolVersion AppProtocolVersion { get; }

Expand Down Expand Up @@ -113,12 +115,14 @@ public static async Task<Libp2pTransport> Create(
public Task StartAsync(CancellationToken cancellationToken = default)
{
// Does nothing.
_running = true;
return Task.CompletedTask;
}

public Task StopAsync(TimeSpan waitFor, CancellationToken cancellationToken = default)
{
// Does nothing.
_running = false;
_runtimeCancellationTokenSource.Cancel();
return Task.CompletedTask;
}
Expand Down Expand Up @@ -155,13 +159,6 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
bool returnWhenTimeout,
CancellationToken cancellationToken)
{
_logger.Information(
"Trying to dial {Remote} As {LocalPeer}",
peer.Multiaddress,
LocalPeer.Address);
IRemotePeer remote = await LocalPeer.DialAsync(peer.Multiaddress, default);
_logger.Information("Dialing to {Remote} successful", peer.Multiaddress);

// FIXME: There should be default maximum timeout.
CancellationTokenSource timerCts = new CancellationTokenSource();
if (timeout is { } timeoutNotNull)
Expand All @@ -175,6 +172,16 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
cancellationToken,
timerCts.Token);

_logger.Verbose(
"Trying to dial {Remote} as {LocalPeer}",
peer.Multiaddress,
LocalPeer.Address);
IRemotePeer remote = await LocalPeer.DialAsync(peer.Multiaddress);
_logger.Verbose(
"Dialing to {Remote} as {LocalPeer} was successful",
peer.Multiaddress,
LocalPeer.Address);

// FIXME: Add logging.
_ = remote.DialAsync<ReqRepProtocol>(linkedCts.Token);

Expand All @@ -188,12 +195,12 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
// FIXME: The tasks may not be ready to consume the message.
// There needs to be a way to know whether the connection is ready
// to consume the message.
await Task.Delay(100);
await Task.Delay(DialProtocolDelay);
Channel<Message> inboundReplyChannel = Channel.CreateUnbounded<Message>();
_logger.Information("Invoking sending message");
RequestMessageToSend?.Invoke(
this,
(peer.Multiaddress, message, 1, inboundReplyChannel));
(remote.Address, message, 1, inboundReplyChannel));

List<Message> replyMessages = new List<Message>();

Expand Down Expand Up @@ -248,26 +255,33 @@ public async Task<IEnumerable<Message>> SendMessageAsync(
replyMessages.Count);
}

return returnWhenTimeout
? replyMessages
: new List<Message>();
#pragma warning disable S3358 // Extract this ternary expresion.

Check warning on line 258 in src/Libplanet.Net/Transports/Libp2pTransport.cs

View workflow job for this annotation

GitHub Actions / typos

"expresion" should be "expression".
return timerCts.IsCancellationRequested
? returnWhenTimeout
? replyMessages
: new List<Message>()
: replyMessages;
#pragma warning restore S3358
}

public void BroadcastMessage(IEnumerable<BoundPeer> peers, MessageContent content)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(NetMQTransport));
throw new ObjectDisposedException(nameof(Libp2pTransport));
}

CancellationToken ct = _runtimeCancellationTokenSource.Token;
List<BoundPeer> boundPeers = peers.ToList();
Task.Run(
async () =>
await boundPeers.ParallelForEachAsync(
peer => SendMessageAsync(peer, content, TimeSpan.FromSeconds(1), ct),
ct),
ct);

// FIXME: Parallel does not work.
// Also should catch an exception and ignore it.
foreach (var boundPeer in boundPeers)
{
_ = SendMessageAsync(boundPeer, content, TimeSpan.FromSeconds(1), ct)
.GetAwaiter()
.GetResult();
}

_logger.Debug(
"Broadcasting message {Message} as {AsPeer} to {PeerCount} peers",
Expand Down Expand Up @@ -298,16 +312,12 @@ private async Task Initialize(
{
// FIXME: Host being null should be dealt with.
_logger.Information("Initialization started");
string localPeerAddressTemplate = $"/ip4/{_hostOptions.Host}/tcp/0";
string listenerAddressTemplate = $"/ip4/{_hostOptions.Host}/tcp/{_hostOptions.Port}";
string addressTemplate = $"/ip4/{_hostOptions.Host}/tcp/{_hostOptions.Port}";
IPeerFactory peerFactory = serviceProvider.GetService<IPeerFactory>()!;
_logger.Information("Peer factory obtained");
ILocalPeer localPeer = peerFactory.Create(
CryptoKeyConverter.ToLibp2pIdentity(_privateKey),
localPeerAddressTemplate);
CryptoKeyConverter.ToLibp2pIdentity(_privateKey), addressTemplate);
_logger.Information("Local peer created at {LocalPeerAddress}", localPeer.Address);
IListener listener = await localPeer.ListenAsync(
listenerAddressTemplate, cancellationToken);
IListener listener = await localPeer.ListenAsync(addressTemplate, cancellationToken);
_logger.Information("Listener started at {ListenerAddress}", listener.Address);

_localPeer = localPeer;
Expand Down
115 changes: 95 additions & 20 deletions test/Libplanet.Net.Tests/Transports/Libp2pTransportTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Crypto;
using Libplanet.Net.Messages;
Expand All @@ -18,8 +19,10 @@

namespace Libplanet.Net.Tests.Transports
{
[CollectionDefinition(nameof(Libp2pTransportTest), DisableParallelization = true)]
public class Libp2pTransportTest : IDisposable
{
public const int Timeout = 30_000;
private bool _disposed;
private ILogger _logger;

Expand All @@ -41,7 +44,7 @@ public Libp2pTransportTest(ITestOutputHelper testOutputHelper)
Dispose(false);
}

[Fact(Timeout = 10_000)]
[Fact(Timeout = Timeout)]
public async Task Initialize()
{
PrivateKey privateKey = new PrivateKey();
Expand All @@ -67,10 +70,16 @@ public async Task Initialize()
Assert.Equal(expected, transport.AsPeer);
}

[Fact(Timeout = 10_000)]
public async Task DialToListeners()
[Theory(Timeout = Timeout)]
[InlineData(true)]
[InlineData(false)]
public async Task DialToListeners(bool usePortZero)
{
// NOTE: Using port 0 does not work for this test.
int count = 2;
List<int> freePorts = usePortZero
? Enumerable.Range(0, count).Select(_ => 0).ToList()
: TestUtils.GetFreePorts(count);
List<PrivateKey> privateKeys = Enumerable
.Range(0, count)
.Select(_ => new PrivateKey())
Expand All @@ -80,7 +89,7 @@ public async Task DialToListeners()
.Select(i => new Libp2pTransport(
privateKeys[i],
new AppProtocolVersionOptions(),
new HostOptions("127.0.0.1", new IceServer[] { }, 0)))
new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[i])))
.ToList();
List<IServiceProvider> serviceProviders = transports
.Select(transport => GetServiceProvider(transport))
Expand All @@ -92,11 +101,12 @@ public async Task DialToListeners()
.Range(0, count)
.Select(i => peerFactories[i].Create(
CryptoKeyConverter.ToLibp2pIdentity(privateKeys[i]),
"/ip4/127.0.0.1/tcp/0"))
$"/ip4/127.0.0.1/tcp/{freePorts[i]}"))
.ToList();
List<IListener> listeners = localPeers
.Select(async localPeer =>
await localPeer.ListenAsync("/ip4/127.0.0.1/tcp/0", default))
List<IListener> listeners = Enumerable
.Range(0, count)
.Select(async i =>
await localPeers[i].ListenAsync($"/ip4/127.0.0.1/tcp/{freePorts[i]}", default))
.Select(task => task.Result)
.ToList();
List<Multiaddress> listenerAddresses = listeners
Expand All @@ -110,15 +120,37 @@ await localPeer.ListenAsync("/ip4/127.0.0.1/tcp/0", default))
Assert.Equal(listenerAddresses[0], remote1.Address);
}

[Fact(Timeout = 10_000)]
public async Task DialToTransports()
[Fact(Timeout = Timeout)]
public async Task DialCancel()
{
PrivateKey privateKey = new PrivateKey();
List<int> freePorts = TestUtils.GetFreePorts(2);
Libp2pTransport transport = await Libp2pTransport.Create(
privateKey,
new AppProtocolVersionOptions(),
new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[0]));

Identity identity = CryptoKeyConverter.ToLibp2pIdentity(new PrivateKey());
Multiaddress badAddress = $"/ip4/127.0.0.1/tcp/{freePorts[1]}/p2p/{identity.PeerId}";
CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(1_000);
await Assert.ThrowsAsync<TaskCanceledException>(
async () => await transport.LocalPeer.DialAsync(badAddress, cts.Token));
}

[Theory(Timeout = Timeout)]
[InlineData(true)]
[InlineData(false)]
public async Task DialToTransports(bool usePortZero)
{
int count = 2;
List<int> freePorts = usePortZero
? Enumerable.Range(0, count).Select(_ => 0).ToList()
: TestUtils.GetFreePorts(count);
List<PrivateKey> privateKeys = Enumerable
.Range(0, count)
.Select(_ => new PrivateKey())
.ToList();
List<int> freePorts = TestUtils.GetFreePorts(2);
List<HostOptions> hosts = freePorts
.Select(freePort => new HostOptions("127.0.0.1", new IceServer[] { }, freePort))
.ToList();
Expand All @@ -140,15 +172,19 @@ public async Task DialToTransports()
Assert.Equal(transports[0].ListenerAddress, remote1.Address);
}

[Fact(Timeout = 10_000)]
public async Task RequestReply()
[Theory(Timeout = Timeout)]
[InlineData(true)]
[InlineData(false)]
public async Task RequestReply(bool usePortZero)
{
int count = 2;
List<int> freePorts = usePortZero
? Enumerable.Range(0, count).Select(_ => 0).ToList()
: TestUtils.GetFreePorts(count);
List<PrivateKey> privateKeys = Enumerable
.Range(0, count)
.Select(_ => new PrivateKey())
.ToList();
List<int> freePorts = TestUtils.GetFreePorts(2);
List<Libp2pTransport> transports = Enumerable
.Range(0, count)
.Select(async i => await Libp2pTransport.Create(
Expand All @@ -166,15 +202,54 @@ public async Task RequestReply()
}
});

List<Message> reply = (await transports[0].SendMessageAsync(
Message reply = await transports[0].SendMessageAsync(
transports[1].AsPeer,
new PingMsg(),
TimeSpan.FromSeconds(5),
1,
true,
default)).ToList();
Message single = Assert.Single<Message>(reply);
Assert.IsType<PongMsg>(single.Content);
default);
Assert.IsType<PongMsg>(reply.Content);
}

[Theory(Timeout = Timeout)]
[InlineData(true)]
[InlineData(false)]
public async Task Broadcast(bool usePortZero)
{
int count = 4;
List<int> freePorts = usePortZero
? Enumerable.Range(0, count).Select(_ => 0).ToList()
: TestUtils.GetFreePorts(count);
List<PrivateKey> privateKeys = Enumerable
.Range(0, count)
.Select(_ => new PrivateKey())
.ToList();
List<Libp2pTransport> transports = Enumerable
.Range(0, count)
.Select(async i => await Libp2pTransport.Create(
privateKeys[i],
new AppProtocolVersionOptions(),
new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[i])))
.Select(task => task.Result)
.ToList();

int receivedCount = 0;
foreach (var transport in transports.Skip(1).ToList())
{
transport.ProcessMessageHandler.Register(async (message, channel) =>
{
if (message.Content is PingMsg)
{
await channel.Writer.WriteAsync(new PongMsg());
receivedCount++;
}
});
}

transports[0].BroadcastMessage(
transports.Skip(1).Select(transport => transport.AsPeer).ToList(),
new PingMsg());
await Task.Delay(1_000);
Assert.Equal(count - 1, receivedCount);
}

public void Dispose()
Expand Down

0 comments on commit 58a4a6b

Please sign in to comment.