diff --git a/src/Libplanet.Net/Protocols/ReqRepProtocol.cs b/src/Libplanet.Net/Protocols/ReqRepProtocol.cs index 484d9f52aa..a5070476c9 100644 --- a/src/Libplanet.Net/Protocols/ReqRepProtocol.cs +++ b/src/Libplanet.Net/Protocols/ReqRepProtocol.cs @@ -46,6 +46,10 @@ public async Task DialAsync( try { _transport.RequestMessageToSend += eventHandler; + _logger.Debug( + "Ready to send a request message to {RemotePeer} as {LocalPeer}", + context.RemotePeer.Address, + context.LocalPeer.Address); await SendAndReceiveMessage(request, downChannel, context); } finally diff --git a/src/Libplanet.Net/Transports/Libp2pTransport.cs b/src/Libplanet.Net/Transports/Libp2pTransport.cs index f1e700a393..12278d281f 100644 --- a/src/Libplanet.Net/Transports/Libp2pTransport.cs +++ b/src/Libplanet.Net/Transports/Libp2pTransport.cs @@ -5,7 +5,6 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using Dasync.Collections; using Libplanet.Crypto; using Libplanet.Net.Messages; using Libplanet.Net.Options; @@ -21,6 +20,8 @@ namespace Libplanet.Net.Transports public class Libp2pTransport : ITransport #pragma warning restore S101 { + public const int DialProtocolDelay = 100; + private readonly PrivateKey _privateKey; private readonly ILogger _logger; private readonly HostOptions _hostOptions; @@ -33,6 +34,7 @@ public class Libp2pTransport : ITransport private Multiaddress? _listenerAddress = null; private CancellationTokenSource _runtimeCancellationTokenSource; + private bool _running = false; private bool _disposed = false; public Libp2pTransport( @@ -81,7 +83,7 @@ public Libp2pTransport( public DateTimeOffset? LastMessageTimestamp { get; } - public bool Running { get; } + public bool Running => _running; public AppProtocolVersion AppProtocolVersion { get; } @@ -113,12 +115,14 @@ public static async Task Create( public Task StartAsync(CancellationToken cancellationToken = default) { // Does nothing. + _running = true; return Task.CompletedTask; } public Task StopAsync(TimeSpan waitFor, CancellationToken cancellationToken = default) { // Does nothing. + _running = false; _runtimeCancellationTokenSource.Cancel(); return Task.CompletedTask; } @@ -155,13 +159,6 @@ public async Task> SendMessageAsync( bool returnWhenTimeout, CancellationToken cancellationToken) { - _logger.Information( - "Trying to dial {Remote} As {LocalPeer}", - peer.Multiaddress, - LocalPeer.Address); - IRemotePeer remote = await LocalPeer.DialAsync(peer.Multiaddress, default); - _logger.Information("Dialing to {Remote} successful", peer.Multiaddress); - // FIXME: There should be default maximum timeout. CancellationTokenSource timerCts = new CancellationTokenSource(); if (timeout is { } timeoutNotNull) @@ -175,6 +172,16 @@ public async Task> SendMessageAsync( cancellationToken, timerCts.Token); + _logger.Verbose( + "Trying to dial {Remote} as {LocalPeer}", + peer.Multiaddress, + LocalPeer.Address); + IRemotePeer remote = await LocalPeer.DialAsync(peer.Multiaddress); + _logger.Verbose( + "Dialing to {Remote} as {LocalPeer} was successful", + peer.Multiaddress, + LocalPeer.Address); + // FIXME: Add logging. _ = remote.DialAsync(linkedCts.Token); @@ -188,12 +195,12 @@ public async Task> SendMessageAsync( // FIXME: The tasks may not be ready to consume the message. // There needs to be a way to know whether the connection is ready // to consume the message. - await Task.Delay(100); + await Task.Delay(DialProtocolDelay); Channel inboundReplyChannel = Channel.CreateUnbounded(); _logger.Information("Invoking sending message"); RequestMessageToSend?.Invoke( this, - (peer.Multiaddress, message, 1, inboundReplyChannel)); + (remote.Address, message, 1, inboundReplyChannel)); List replyMessages = new List(); @@ -248,26 +255,33 @@ public async Task> SendMessageAsync( replyMessages.Count); } - return returnWhenTimeout - ? replyMessages - : new List(); +#pragma warning disable S3358 // Extract this ternary expresion. + return timerCts.IsCancellationRequested + ? returnWhenTimeout + ? replyMessages + : new List() + : replyMessages; +#pragma warning restore S3358 } public void BroadcastMessage(IEnumerable peers, MessageContent content) { if (_disposed) { - throw new ObjectDisposedException(nameof(NetMQTransport)); + throw new ObjectDisposedException(nameof(Libp2pTransport)); } CancellationToken ct = _runtimeCancellationTokenSource.Token; List boundPeers = peers.ToList(); - Task.Run( - async () => - await boundPeers.ParallelForEachAsync( - peer => SendMessageAsync(peer, content, TimeSpan.FromSeconds(1), ct), - ct), - ct); + + // FIXME: Parallel does not work. + // Also should catch an exception and ignore it. + foreach (var boundPeer in boundPeers) + { + _ = SendMessageAsync(boundPeer, content, TimeSpan.FromSeconds(1), ct) + .GetAwaiter() + .GetResult(); + } _logger.Debug( "Broadcasting message {Message} as {AsPeer} to {PeerCount} peers", @@ -298,16 +312,12 @@ private async Task Initialize( { // FIXME: Host being null should be dealt with. _logger.Information("Initialization started"); - string localPeerAddressTemplate = $"/ip4/{_hostOptions.Host}/tcp/0"; - string listenerAddressTemplate = $"/ip4/{_hostOptions.Host}/tcp/{_hostOptions.Port}"; + string addressTemplate = $"/ip4/{_hostOptions.Host}/tcp/{_hostOptions.Port}"; IPeerFactory peerFactory = serviceProvider.GetService()!; - _logger.Information("Peer factory obtained"); ILocalPeer localPeer = peerFactory.Create( - CryptoKeyConverter.ToLibp2pIdentity(_privateKey), - localPeerAddressTemplate); + CryptoKeyConverter.ToLibp2pIdentity(_privateKey), addressTemplate); _logger.Information("Local peer created at {LocalPeerAddress}", localPeer.Address); - IListener listener = await localPeer.ListenAsync( - listenerAddressTemplate, cancellationToken); + IListener listener = await localPeer.ListenAsync(addressTemplate, cancellationToken); _logger.Information("Listener started at {ListenerAddress}", listener.Address); _localPeer = localPeer; diff --git a/test/Libplanet.Net.Tests/Transports/Libp2pTransportTest.cs b/test/Libplanet.Net.Tests/Transports/Libp2pTransportTest.cs index b4443a818c..5182a2045e 100644 --- a/test/Libplanet.Net.Tests/Transports/Libp2pTransportTest.cs +++ b/test/Libplanet.Net.Tests/Transports/Libp2pTransportTest.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Net; +using System.Threading; using System.Threading.Tasks; using Libplanet.Crypto; using Libplanet.Net.Messages; @@ -18,8 +19,10 @@ namespace Libplanet.Net.Tests.Transports { + [CollectionDefinition(nameof(Libp2pTransportTest), DisableParallelization = true)] public class Libp2pTransportTest : IDisposable { + public const int Timeout = 30_000; private bool _disposed; private ILogger _logger; @@ -41,7 +44,7 @@ public Libp2pTransportTest(ITestOutputHelper testOutputHelper) Dispose(false); } - [Fact(Timeout = 10_000)] + [Fact(Timeout = Timeout)] public async Task Initialize() { PrivateKey privateKey = new PrivateKey(); @@ -67,10 +70,16 @@ public async Task Initialize() Assert.Equal(expected, transport.AsPeer); } - [Fact(Timeout = 10_000)] - public async Task DialToListeners() + [Theory(Timeout = Timeout)] + [InlineData(true)] + [InlineData(false)] + public async Task DialToListeners(bool usePortZero) { + // NOTE: Using port 0 does not work for this test. int count = 2; + List freePorts = usePortZero + ? Enumerable.Range(0, count).Select(_ => 0).ToList() + : TestUtils.GetFreePorts(count); List privateKeys = Enumerable .Range(0, count) .Select(_ => new PrivateKey()) @@ -80,7 +89,7 @@ public async Task DialToListeners() .Select(i => new Libp2pTransport( privateKeys[i], new AppProtocolVersionOptions(), - new HostOptions("127.0.0.1", new IceServer[] { }, 0))) + new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[i]))) .ToList(); List serviceProviders = transports .Select(transport => GetServiceProvider(transport)) @@ -92,11 +101,12 @@ public async Task DialToListeners() .Range(0, count) .Select(i => peerFactories[i].Create( CryptoKeyConverter.ToLibp2pIdentity(privateKeys[i]), - "/ip4/127.0.0.1/tcp/0")) + $"/ip4/127.0.0.1/tcp/{freePorts[i]}")) .ToList(); - List listeners = localPeers - .Select(async localPeer => - await localPeer.ListenAsync("/ip4/127.0.0.1/tcp/0", default)) + List listeners = Enumerable + .Range(0, count) + .Select(async i => + await localPeers[i].ListenAsync($"/ip4/127.0.0.1/tcp/{freePorts[i]}", default)) .Select(task => task.Result) .ToList(); List listenerAddresses = listeners @@ -110,15 +120,37 @@ await localPeer.ListenAsync("/ip4/127.0.0.1/tcp/0", default)) Assert.Equal(listenerAddresses[0], remote1.Address); } - [Fact(Timeout = 10_000)] - public async Task DialToTransports() + [Fact(Timeout = Timeout)] + public async Task DialCancel() + { + PrivateKey privateKey = new PrivateKey(); + List freePorts = TestUtils.GetFreePorts(2); + Libp2pTransport transport = await Libp2pTransport.Create( + privateKey, + new AppProtocolVersionOptions(), + new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[0])); + + Identity identity = CryptoKeyConverter.ToLibp2pIdentity(new PrivateKey()); + Multiaddress badAddress = $"/ip4/127.0.0.1/tcp/{freePorts[1]}/p2p/{identity.PeerId}"; + CancellationTokenSource cts = new CancellationTokenSource(); + cts.CancelAfter(1_000); + await Assert.ThrowsAsync( + async () => await transport.LocalPeer.DialAsync(badAddress, cts.Token)); + } + + [Theory(Timeout = Timeout)] + [InlineData(true)] + [InlineData(false)] + public async Task DialToTransports(bool usePortZero) { int count = 2; + List freePorts = usePortZero + ? Enumerable.Range(0, count).Select(_ => 0).ToList() + : TestUtils.GetFreePorts(count); List privateKeys = Enumerable .Range(0, count) .Select(_ => new PrivateKey()) .ToList(); - List freePorts = TestUtils.GetFreePorts(2); List hosts = freePorts .Select(freePort => new HostOptions("127.0.0.1", new IceServer[] { }, freePort)) .ToList(); @@ -140,15 +172,19 @@ public async Task DialToTransports() Assert.Equal(transports[0].ListenerAddress, remote1.Address); } - [Fact(Timeout = 10_000)] - public async Task RequestReply() + [Theory(Timeout = Timeout)] + [InlineData(true)] + [InlineData(false)] + public async Task RequestReply(bool usePortZero) { int count = 2; + List freePorts = usePortZero + ? Enumerable.Range(0, count).Select(_ => 0).ToList() + : TestUtils.GetFreePorts(count); List privateKeys = Enumerable .Range(0, count) .Select(_ => new PrivateKey()) .ToList(); - List freePorts = TestUtils.GetFreePorts(2); List transports = Enumerable .Range(0, count) .Select(async i => await Libp2pTransport.Create( @@ -166,15 +202,54 @@ public async Task RequestReply() } }); - List reply = (await transports[0].SendMessageAsync( + Message reply = await transports[0].SendMessageAsync( transports[1].AsPeer, new PingMsg(), TimeSpan.FromSeconds(5), - 1, - true, - default)).ToList(); - Message single = Assert.Single(reply); - Assert.IsType(single.Content); + default); + Assert.IsType(reply.Content); + } + + [Theory(Timeout = Timeout)] + [InlineData(true)] + [InlineData(false)] + public async Task Broadcast(bool usePortZero) + { + int count = 4; + List freePorts = usePortZero + ? Enumerable.Range(0, count).Select(_ => 0).ToList() + : TestUtils.GetFreePorts(count); + List privateKeys = Enumerable + .Range(0, count) + .Select(_ => new PrivateKey()) + .ToList(); + List transports = Enumerable + .Range(0, count) + .Select(async i => await Libp2pTransport.Create( + privateKeys[i], + new AppProtocolVersionOptions(), + new HostOptions("127.0.0.1", new IceServer[] { }, freePorts[i]))) + .Select(task => task.Result) + .ToList(); + + int receivedCount = 0; + foreach (var transport in transports.Skip(1).ToList()) + { + transport.ProcessMessageHandler.Register(async (message, channel) => + { + if (message.Content is PingMsg) + { + await channel.Writer.WriteAsync(new PongMsg()); + receivedCount++; + } + }); + } + + transports[0].BroadcastMessage( + transports.Skip(1).Select(transport => transport.AsPeer).ToList(), + new PingMsg()); + await Task.Delay(1_000); + Assert.Equal(count - 1, receivedCount); } public void Dispose()