Skip to content

Commit

Permalink
Reworked the handling of async results within the resolver task (#7152)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib authored Jul 1, 2024
1 parent 51933a6 commit 07388b4
Show file tree
Hide file tree
Showing 131 changed files with 1,976 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ public Task Send_Subscribe_SyntaxError()
async ct =>
{
// arrange
snapshot.Clear();
using var testServer = CreateStarWarsServer();
var client = CreateWebSocketClient(testServer);
using var webSocket = await ConnectToServerAsync(client, ct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public Task Subscribe_ReceiveDataOnMutation()
await testServer.SendPostRequestAsync(
new ClientQueryRequest
{
Query =
Query =
"""
mutation {
createReview(episode: NEW_HOPE review: {
Expand Down Expand Up @@ -521,19 +521,19 @@ await TryTest(
"subscription { onReview(episode: NEW_HOPE) { stars } }");

var stopwatch = Stopwatch.StartNew();

for (var i = 0; i < 600; i++)
{
await webSocket.SendSubscribeAsync(i.ToString(), payload, ct);
}

while(diagnostics.Subscribed < 600)
{
await Task.Delay(10, ct);
}

_output.WriteLine($"Subscribed in {stopwatch.ElapsedMilliseconds}ms");

await testServer.SendPostRequestAsync(
new ClientQueryRequest
{
Expand Down Expand Up @@ -712,6 +712,7 @@ public Task Send_Pong_With_Payload()
async ct =>
{
// arrange
snapshot.Clear();
var interceptor = new PingPongInterceptor();
using var testServer = CreateStarWarsServer(
configureServices: s => s
Expand Down Expand Up @@ -957,9 +958,9 @@ public override ValueTask OnPongAsync(
public sealed class SubscriptionTestDiagnostics : SubscriptionDiagnosticEventsListener
{
private int _subscribed;

public int Subscribed => _subscribed;

public bool UnsubscribeInvoked { get; private set; }

public bool CloseInvoked { get; private set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using static HotChocolate.ExecutableErrorHelper;

namespace HotChocolate;

internal sealed class DefaultAsyncEnumerableExecutable<T>(IAsyncEnumerable<T> source) : Executable<T>
{
public override object Source => source;

public override async ValueTask<T?> FirstOrDefaultAsync(CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
return await enumerator.MoveNextAsync() ? enumerator.Current : default;
}

public override async ValueTask<T?> SingleOrDefaultAsync(CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);

if (await enumerator.MoveNextAsync())
{
var result = enumerator.Current;

if (await enumerator.MoveNextAsync())
{
throw new GraphQLException(SequenceContainsMoreThanOneElement());
}

return result;
}

return default;
}

public override async ValueTask<List<T>> ToListAsync(CancellationToken cancellationToken = default)
{
var result = new List<T>();

await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
result.Add(element);
}

return result;
}

public override async IAsyncEnumerable<T> ToAsyncEnumerable(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await foreach (var element in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return element;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@

using System;
using System.Collections;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using static HotChocolate.ExecutableErrorHelper;

namespace HotChocolate;

internal sealed class DefaultEnumerableExecutable(IEnumerable source) : IExecutable
{
public object Source => source;

public ValueTask<IList> ToListAsync(CancellationToken cancellationToken = default)
{
var list = new List<object?>();

foreach (var item in source)
{
list.Add(item);
}

return new ValueTask<IList>(list);
}

public async IAsyncEnumerable<object?> ToAsyncEnumerable(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (source is IAsyncEnumerable<object?> stream)
{
await foreach (var element in stream.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return element;
}
}
else
{
foreach (var item in source)
{
yield return item;
}
}
}

public ValueTask<object?> FirstOrDefaultAsync(CancellationToken cancellationToken = default)
{
var enumerator = source.GetEnumerator();

try
{
return enumerator.MoveNext()
? new ValueTask<object?>(enumerator.Current)
: new ValueTask<object?>(default(object?));
}
finally
{
if(enumerator is IDisposable disposable)
{
disposable.Dispose();
}
}
}

public ValueTask<object?> SingleOrDefaultAsync(CancellationToken cancellationToken = default)
{
var enumerator = source.GetEnumerator();

try
{
if (enumerator.MoveNext())
{
var obj = enumerator.Current;

if(enumerator.MoveNext())
{
throw new GraphQLException(SequenceContainsMoreThanOneElement());
}

return new ValueTask<object?>(obj);
}

return new ValueTask<object?>(default(object?));
}
finally
{
if(enumerator is IDisposable disposable)
{
disposable.Dispose();
}
}
}

public string Print() => Source.ToString() ?? Source.GetType().FullName ?? Source.GetType().Name;

public override string ToString() => Print();
}
Loading

0 comments on commit 07388b4

Please sign in to comment.