Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
Still requires tests
  • Loading branch information
rkargMsft committed Dec 19, 2024
1 parent 4777dae commit ed20ef3
Show file tree
Hide file tree
Showing 23 changed files with 682 additions and 3 deletions.
5 changes: 5 additions & 0 deletions src/Orleans.Core.Abstractions/Manifest/GrainProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static class WellKnownGrainTypeProperties
/// </summary>
public const string PlacementStrategy = "placement-strategy";

/// <summary>
/// The name of the placement strategy for grains of this type.
/// </summary>
public const string PlacementFilter = "placement-filter";

/// <summary>
/// The directory policy for grains of this type.
/// </summary>
Expand Down
7 changes: 5 additions & 2 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
using Orleans.Serialization.Internal;
using Orleans.Core;
using Orleans.Placement.Repartitioning;
using Orleans.GrainDirectory;
using Orleans.Runtime.Hosting;
using Orleans.Runtime.Placement.Filtering;

namespace Orleans.Hosting
{
Expand Down Expand Up @@ -206,6 +205,10 @@ internal static void AddDefaultServices(ISiloBuilder builder)
// Configure the default placement strategy.
services.TryAddSingleton<PlacementStrategy, RandomPlacement>();

// Placement filters
services.AddSingleton<PlacementFilterStrategyResolver>();
services.AddSingleton<PlacementFilterDirectorResolver>();

// Placement directors
services.AddPlacementDirector<RandomPlacement, RandomPlacementDirector>();
services.AddPlacementDirector<PreferLocalPlacement, PreferLocalPlacementDirector>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Orleans.Runtime.MembershipService.SiloMetadata;
#nullable enable
public interface ISiloMetadataCache
{
SiloMetadata GetMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading.Tasks;
using Orleans.Services;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public interface ISiloMetadataClient : IGrainServiceClient<ISiloMetadataGrainService>
{
Task<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Threading.Tasks;
using Orleans.Services;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

[Alias("Orleans.Runtime.MembershipService.SiloMetadata.ISiloMetadataGrainService")]
public interface ISiloMetadataGrainService : IGrainService
{
[Alias("GetSiloMetadata")]
Task<SiloMetadata> GetSiloMetadata();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Internal;

namespace Orleans.Runtime.MembershipService.SiloMetadata;
#nullable enable
internal class SiloMetadataCache(
ISiloMetadataClient siloMetadataClient,
MembershipTableManager membershipTableManager,
ILogger<SiloMetadataCache> logger)
: ISiloMetadataCache, ILifecycleParticipant<ISiloLifecycle>, IDisposable
{
private readonly ConcurrentDictionary<SiloAddress, SiloMetadata> _metadata = new();
private readonly CancellationTokenSource _cts = new();

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
var tasks = new List<Task>(1);
var cancellation = new CancellationTokenSource();
Task OnRuntimeInitializeStart(CancellationToken _)
{
tasks.Add(Task.Run(() => this.ProcessMembershipUpdates(cancellation.Token)));
return Task.CompletedTask;
}

async Task OnRuntimeInitializeStop(CancellationToken ct)
{
cancellation.Cancel(throwOnFirstException: false);
var shutdownGracePeriod = Task.WhenAll(Task.Delay(ClusterMembershipOptions.ClusteringShutdownGracePeriod), ct.WhenCancelled());
await Task.WhenAny(shutdownGracePeriod, Task.WhenAll(tasks));
}

lifecycle.Subscribe(
nameof(ClusterMembershipService),
ServiceLifecycleStage.RuntimeInitialize,
OnRuntimeInitializeStart,
OnRuntimeInitializeStop);
}


private async Task ProcessMembershipUpdates(CancellationToken ct)
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Starting to process membership updates");
await foreach (var update in membershipTableManager.MembershipTableUpdates.WithCancellation(ct))
{
// Add entries for members that aren't already in the cache
foreach (var membershipEntry in update.Entries)
{
if (!_metadata.ContainsKey(membershipEntry.Key))
{
try
{
var metadata = await siloMetadataClient.GetSiloMetadata(membershipEntry.Key);
_metadata.TryAdd(membershipEntry.Key, metadata);
}
catch(Exception exception)
{
logger.LogError(exception, "Error fetching metadata for silo {Silo}", membershipEntry.Key);
}
}
}

// Remove entries for members that are no longer in the table
foreach (var silo in _metadata.Keys.ToList())
{
if (!update.Entries.ContainsKey(silo))
{
_metadata.TryRemove(silo, out _);
}
}
}
}
catch (Exception exception)
{
logger.LogError(exception, "Error processing membership updates");
}
finally
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Stopping membership update processor");
}
}

public SiloMetadata GetMetadata(SiloAddress siloAddress) => _metadata.GetValueOrDefault(siloAddress) ?? SiloMetadata.Empty;

public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata);

public void Dispose() => _cts.Cancel();
}
17 changes: 17 additions & 0 deletions src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Collections.Generic;
using System.Collections.Immutable;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

[GenerateSerializer]
[Alias("Orleans.Runtime.MembershipService.SiloMetadata.SiloMetadata")]
public record SiloMetadata
{
public static SiloMetadata Empty { get; } = new SiloMetadata();

[Id(0)]
public ImmutableDictionary<string, string> Metadata { get; private set; } = ImmutableDictionary<string, string>.Empty;

internal void AddMetadata(IEnumerable<KeyValuePair<string, string>> metadata) => Metadata = Metadata.AddRange(metadata);
internal void AddMetadata(string key, string value) => Metadata = Metadata.Add(key, value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Threading.Tasks;
using Orleans.Runtime.Services;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public class SiloMetadataClient(IServiceProvider serviceProvider)
: GrainServiceClient<ISiloMetadataGrainService>(serviceProvider), ISiloMetadataClient
{
public async Task<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress)
{
var grainService = GetGrainService(siloAddress);
var metadata = await grainService.GetSiloMetadata();
return metadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public class SiloMetadataGrainService : GrainService, ISiloMetadataGrainService
{
private readonly SiloMetadata _siloMetadata;

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata) : base()
{
_siloMetadata = siloMetadata.Value;
}

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata, GrainId grainId, Silo silo, ILoggerFactory loggerFactory) : base(grainId, silo, loggerFactory)
{
_siloMetadata = siloMetadata.Value;
}

public Task<SiloMetadata> GetSiloMetadata() => Task.FromResult(_siloMetadata);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System.Collections.Generic;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration.Internal;
using Orleans.Hosting;
using Orleans.Runtime.Placement.Filtering;

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public static class SiloMetadataHostingExtensions
{

/// <summary>
/// Configure silo metadata from the builder configuration.
/// </summary>
/// <param name="builder">Silo builder</param>
/// <remarks>
/// Get the ORLEANS__METADATA section from config
/// Key/value pairs in configuration as a <see cref="Dictionary{TKey,TValue}"/> will look like this as environment variables:
/// ORLEANS__METADATA__key1=value1
/// </remarks>
/// <returns></returns>
public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder) => builder.UseSiloMetadata(builder.Configuration);

/// <summary>
/// Configure silo metadata from configuration.
/// </summary>
/// <param name="builder">Silo builder</param>
/// <param name="configuration">Configuration to pull from</param>
/// <remarks>
/// Get the ORLEANS__METADATA section from config
/// Key/value pairs in configuration as a <see cref="Dictionary{TKey,TValue}"/> will look like this as environment variables:
/// ORLEANS__METADATA__key1=value1
/// </remarks>
/// <returns></returns>
public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, IConfiguration configuration)
{

var metadataConfigSection = builder.Configuration.GetSection("ORLEANS").GetSection("METADATA");

return builder.UseSiloMetadata(metadataConfigSection);
}

/// <summary>
/// Configure silo metadata from configuration section.
/// </summary>
/// <param name="builder">Silo builder</param>
/// <param name="configurationSection">Configuration section to pull from</param>
/// <remarks>
/// Get the ORLEANS__METADATA section from config section
/// Key/value pairs in configuration as a <see cref="Dictionary{TKey,TValue}"/> will look like this as environment variables:
/// ORLEANS__METADATA__key1=value1
/// </remarks>
/// <returns></returns>
public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, IConfigurationSection configurationSection)
{
var dictionary = configurationSection.Get<Dictionary<string, string>>();

return builder.UseSiloMetadata(dictionary ?? new Dictionary<string, string>());
}

/// <summary>
/// Configure silo metadata from configuration section.
/// </summary>
/// <param name="builder">Silo builder</param>
/// <param name="metadata">Metadata to add</param>
/// <returns></returns>
public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, Dictionary<string, string> metadata)
{
builder.ConfigureServices(services =>
{
services
.AddOptionsWithValidateOnStart<SiloMetadata>()
.Configure(m =>
{
m.AddMetadata(metadata);
});

services.AddGrainService<SiloMetadataGrainService>();
services.AddSingleton<SiloMetadataCache>();
services.AddFromExisting<ISiloMetadataCache, SiloMetadataCache>();
services.AddFromExisting<ILifecycleParticipant<ISiloLifecycle>, SiloMetadataCache>();
services.AddSingleton<ISiloMetadataClient, SiloMetadataClient>();
// Placement filters
services.AddPlacementFilter<PreferredSiloMetadataPlacementFilterStrategy, PreferredSiloMetadataPlacementFilterDirector>(ServiceLifetime.Transient);
services.AddPlacementFilter<RequiredSiloMetadataPlacementFilterStrategy, RequiredSiloMetadataFilterDirector>(ServiceLifetime.Transient);
});
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Collections.Generic;

namespace Orleans.Runtime.Placement.Filtering;

public interface IPlacementFilterDirector
{
IEnumerable<SiloAddress> Filter(PlacementFilterStrategy filterStrategy, PlacementTarget target,
IEnumerable<SiloAddress> silos);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using Orleans.Metadata;

namespace Orleans.Runtime.Placement.Filtering;

/// <summary>
/// Base for all placement filter marker attributes.
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public abstract class PlacementFilterAttribute : Attribute, IGrainPropertiesProviderAttribute
{
public PlacementFilterStrategy PlacementFilterStrategy { get; private set; }

protected PlacementFilterAttribute(PlacementFilterStrategy placement)
{
ArgumentNullException.ThrowIfNull(placement);
PlacementFilterStrategy = placement;
}

/// <inheritdoc />
public virtual void Populate(IServiceProvider services, Type grainClass, GrainType grainType, Dictionary<string, string> properties)
=> PlacementFilterStrategy?.PopulateGrainProperties(services, grainClass, grainType, properties);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Runtime.Placement.Filtering;

/// <summary>
/// Responsible for resolving an <see cref="IPlacementFilterDirector"/> for a <see cref="PlacementFilterStrategy"/>.
/// </summary>
public sealed class PlacementFilterDirectorResolver(IServiceProvider services)
{
public IPlacementFilterDirector GetFilterDirector(PlacementFilterStrategy placementFilterStrategy) => services.GetRequiredKeyedService<IPlacementFilterDirector>(placementFilterStrategy.GetType());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.DependencyInjection;

namespace Orleans.Runtime.Placement.Filtering;

public static class PlacementFilterExtensions
{
/// <summary>
/// Configures a <typeparamref name="TFilter"/> for filtering candidate grain placements.
/// </summary>
/// <typeparam name="TFilter">The placement filter.</typeparam>
/// <typeparam name="TDirector">The placement filter director.</typeparam>
/// <param name="services">The service collection.</param>
/// <param name="strategyLifetime">The lifetime of the placement strategy.</param>
/// <returns>The service collection.</returns>
public static void AddPlacementFilter<TFilter, TDirector>(this IServiceCollection services, ServiceLifetime strategyLifetime)
where TFilter : PlacementFilterStrategy
where TDirector : class, IPlacementFilterDirector
{
services.Add(ServiceDescriptor.DescribeKeyed(typeof(PlacementFilterStrategy), typeof(TFilter).Name, typeof(TFilter), strategyLifetime));
services.AddKeyedSingleton<IPlacementFilterDirector, TDirector>(typeof(TFilter));
}

}
Loading

0 comments on commit ed20ef3

Please sign in to comment.