Skip to content

Commit

Permalink
LocalGrainDirectory.UnregisterManyAsync should always be called from …
Browse files Browse the repository at this point in the history
…RemoteGrainDirectory context (#6575)
  • Loading branch information
benjaminpetit authored Jun 4, 2020
1 parent 97b7cd5 commit 6c7b270
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 10 deletions.
18 changes: 15 additions & 3 deletions src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading;
using System.Threading.Tasks;
using Orleans.GrainDirectory;
using Orleans.Runtime.Scheduler;

namespace Orleans.Runtime.GrainDirectory
{
Expand All @@ -14,12 +15,19 @@ namespace Orleans.Runtime.GrainDirectory
internal class DhtGrainLocator : IGrainLocator
{
private readonly ILocalGrainDirectory localGrainDirectory;
private readonly OrleansTaskScheduler taskScheduler;
private readonly IGrainContext grainContext;
private readonly ConcurrentQueue<(TaskCompletionSource<object> tcs, ActivationAddress address, UnregistrationCause cause)> unregistrationQueue = new ConcurrentQueue<(TaskCompletionSource<object> tcs, ActivationAddress address, UnregistrationCause cause)>();
private int isWorking = 0;

public DhtGrainLocator(ILocalGrainDirectory localGrainDirectory)
public DhtGrainLocator(
ILocalGrainDirectory localGrainDirectory,
OrleansTaskScheduler taskScheduler,
IGrainContext grainContext)
{
this.localGrainDirectory = localGrainDirectory;
this.taskScheduler = taskScheduler;
this.grainContext = grainContext;
}

public async Task<List<ActivationAddress>> Lookup(GrainId grainId)
Expand All @@ -39,14 +47,18 @@ public bool TryLocalLookup(GrainId grainId, out List<ActivationAddress> addresse
public async Task<ActivationAddress> Register(ActivationAddress address)
=> (await this.localGrainDirectory.RegisterAsync(address, singleActivation: true)).Address;

public Task Unregister(ActivationAddress address, UnregistrationCause cause)
public Task Unregister(ActivationAddress address, UnregistrationCause cause)
{
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
this.unregistrationQueue.Enqueue((tcs, address, cause));
UnregisterExecute().Ignore();
// Make sure to not run the loop on the Grain Activation context
this.taskScheduler.RunOrQueueTask(() => this.UnregisterExecute(), this.grainContext).Ignore();
return tcs.Task;
}

public static DhtGrainLocator FromLocalGrainDirectory(LocalGrainDirectory localGrainDirectory)
=> new DhtGrainLocator(localGrainDirectory, localGrainDirectory.Scheduler, localGrainDirectory.RemoteGrainDirectory);

private async Task UnregisterExecute()
{
while (!this.unregistrationQueue.IsEmpty)
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ internal static void AddDefaultServices(IApplicationPartManager applicationPartM
services.TryAddSingleton<ActivationCollector>();
services.TryAddSingleton<LocalGrainDirectory>();
services.TryAddFromExisting<ILocalGrainDirectory, LocalGrainDirectory>();
services.AddSingleton<DhtGrainLocator>();
services.AddSingleton<DhtGrainLocator>(sp => DhtGrainLocator.FromLocalGrainDirectory(sp.GetService<LocalGrainDirectory>()));
services.AddSingleton<IGrainDirectoryResolver, GrainDirectoryResolver>();
if (GrainDirectoryResolver.HasAnyRegisteredGrainDirectory(services))
{
Expand Down
20 changes: 17 additions & 3 deletions test/NonSilo.Tests/Directory/CachedGrainLocatorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
using Orleans.GrainDirectory;
using Orleans.Runtime;
using Orleans.Runtime.GrainDirectory;
using Orleans.Runtime.Scheduler;
using Orleans.Runtime.Utilities;
using TestExtensions;
using UnitTests.SchedulerTests;
using UnitTests.TesterInternal;
using Xunit;
using Xunit.Abstractions;

namespace UnitTests.Directory
{
[TestCategory("BVT"), TestCategory("Directory")]
public class CachedGrainLocatorTests
public class CachedGrainLocatorTests : IDisposable
{
private readonly LoggerFactory loggerFactory;
private readonly SiloLifecycleSubject lifecycle;
Expand All @@ -29,7 +32,8 @@ public class CachedGrainLocatorTests
private readonly IGrainDirectoryResolver grainDirectoryResolver;
private readonly ILocalGrainDirectory localGrainDirectory;
private readonly MockClusterMembershipService mockMembershipService;

private readonly UnitTestSchedulingContext rootContext;
private readonly OrleansTaskScheduler taskScheduler;
private readonly CachedGrainLocator grainLocator;

public CachedGrainLocatorTests(ITestOutputHelper output)
Expand All @@ -43,10 +47,12 @@ public CachedGrainLocatorTests(ITestOutputHelper output)
this.grainDirectoryResolver.Directories.Returns(new[] { this.grainDirectory });
this.localGrainDirectory = Substitute.For<ILocalGrainDirectory>();
this.mockMembershipService = new MockClusterMembershipService();
this.rootContext = new UnitTestSchedulingContext();
this.taskScheduler = TestInternalHelper.InitializeSchedulerForTesting(this.rootContext, this.loggerFactory);

this.grainLocator = new CachedGrainLocator(
this.grainDirectoryResolver,
new DhtGrainLocator(this.localGrainDirectory),
new DhtGrainLocator(this.localGrainDirectory, this.taskScheduler, this.rootContext),
this.mockMembershipService.Target);

this.grainLocator.Participate(this.lifecycle);
Expand Down Expand Up @@ -276,5 +282,13 @@ private static async Task Until(Func<bool> condition)
while (!condition() && (maxTimeout -= 10) > 0) await Task.Delay(10);
Assert.True(maxTimeout > 0);
}

public void Dispose()
{
if (this.taskScheduler != null)
{
this.taskScheduler.Stop();
}
}
}
}
27 changes: 24 additions & 3 deletions test/NonSilo.Tests/Directory/DhtGrainLocatorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,39 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.GrainDirectory;
using Orleans.Runtime;
using Orleans.Runtime.GrainDirectory;
using Orleans.Runtime.Scheduler;
using TestExtensions;
using UnitTests.SchedulerTests;
using UnitTests.TesterInternal;
using Xunit;
using Xunit.Abstractions;

namespace UnitTests.Directory
{
[TestCategory("BVT"), TestCategory("Directory")]
public class DhtGrainLocatorTests
public class DhtGrainLocatorTests : IDisposable
{
private readonly DhtGrainLocator target;
private readonly MockLocalGrainDirectory localGrainDirectory;
private readonly ITestOutputHelper output;
private readonly LoggerFactory loggerFactory;
private readonly UnitTestSchedulingContext rootContext;
private readonly OrleansTaskScheduler taskScheduler;

public DhtGrainLocatorTests(ITestOutputHelper output)
{
this.output = output;
this.localGrainDirectory = new MockLocalGrainDirectory(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(200));
this.target = new DhtGrainLocator(this.localGrainDirectory);
this.loggerFactory = new LoggerFactory(new[] { new XunitLoggerProvider(output) });
this.rootContext = new UnitTestSchedulingContext();
this.taskScheduler = TestInternalHelper.InitializeSchedulerForTesting(this.rootContext, this.loggerFactory);
this.localGrainDirectory = new MockLocalGrainDirectory(
TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(200));
this.target = new DhtGrainLocator(this.localGrainDirectory, this.taskScheduler, this.rootContext);
}

[Fact]
Expand Down Expand Up @@ -108,5 +121,13 @@ private ActivationAddress GenerateActivationAddress()

return ActivationAddress.NewActivationAddress(siloAddr, grainId);
}

public void Dispose()
{
if (this.taskScheduler != null)
{
this.taskScheduler.Stop();
}
}
}
}

0 comments on commit 6c7b270

Please sign in to comment.