Skip to content

Commit

Permalink
temp: remove all async dealermethods
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed Dec 20, 2024
1 parent 6fe52ff commit 422e951
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
28 changes: 23 additions & 5 deletions src/Libplanet.Net/Transports/NetMQChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ public void Open()

public async IAsyncEnumerable<NetMQMessage> SendMessageAsync(
NetMQMessage message,
TimeSpan? timeout,
int expectedResponses,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<NetMQMessage>();
await _requests.Writer.WriteAsync(
new MessageRequest(
message,
timeout,
expectedResponses,
channel,
cancellationToken),
Expand All @@ -88,14 +90,19 @@ private async Task ProcessRuntime(CancellationToken ct)
{
using var dealer = new DealerSocket();
dealer.Options.DisableTimeWait = true;
dealer.Connect(await _peer.ResolveNetMQAddressAsync());
var address = await _peer.ResolveNetMQAddressAsync();
_logger.Debug("[NetMQChannel] Connecting {Address}", address);
dealer.Connect(address);
while (!ct.IsCancellationRequested)
{
MessageRequest req = await _requests.Reader.ReadAsync(ct);
_lastUpdated = DateTimeOffset.UtcNow;
CancellationTokenSource linked =
CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken);
_logger.Debug("[NetMQChannel] Trying to send message {Message}", req.Message);
_logger.Debug(
"[NetMQChannel] Trying to send message {Message} (count: {ExpectedResponses})",
req.Message,
req.ExpectedResponses);
if (!dealer.TrySendMultipartMessage(req.Message))
{
_logger.Debug(
Expand All @@ -109,9 +116,16 @@ private async Task ProcessRuntime(CancellationToken ct)

foreach (var i in Enumerable.Range(0, req.ExpectedResponses))
{
NetMQMessage raw = await dealer.ReceiveMultipartMessageAsync(
cancellationToken: linked.Token
);
_logger.Debug(
"[NetMQChannel] Waiting for replies... (#{Index})", i);
var raw = new NetMQMessage();
if (!dealer.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref raw))
{
break;
}

_logger.Debug(
"[NetMQChannel] Successfully received replies #{Index}", i);
_lastUpdated = DateTimeOffset.UtcNow;

await req.Channel.Writer.WriteAsync(raw, linked.Token);
Expand All @@ -125,18 +139,22 @@ private readonly struct MessageRequest
{
public MessageRequest(
NetMQMessage message,
TimeSpan? timeout,
in int expectedResponses,
Channel<NetMQMessage> channel,
CancellationToken cancellationToken)
{
Message = message;
Timeout = timeout;
ExpectedResponses = expectedResponses;
Channel = channel;
CancellationToken = cancellationToken;
}

public NetMQMessage Message { get; }

public TimeSpan? Timeout { get; }

public int ExpectedResponses { get; }

public Channel<NetMQMessage> Channel { get; }
Expand Down
1 change: 1 addition & 0 deletions src/Libplanet.Net/Transports/NetMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ CancellationToken cancellationToken

await foreach (var raw in channel.SendMessageAsync(
rawMessage,
timeout,
expectedResponses,
linkedCt))
{
Expand Down

0 comments on commit 422e951

Please sign in to comment.