Skip to content

Commit

Permalink
Fix a race condition in LifecycleSubject (#6481)
Browse files Browse the repository at this point in the history
(cherry picked from commit f0a9520)
  • Loading branch information
pentp authored and sergeybykov committed Apr 15, 2020
1 parent 91d23ba commit c294977
Showing 1 changed file with 29 additions and 28 deletions.
57 changes: 29 additions & 28 deletions src/Orleans.Core/Lifecycle/LifecycleSubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public virtual async Task OnStart(CancellationToken ct)
var stage = observerGroup.Key;
this.highStage = stage;
var stopWatch = ValueStopwatch.StartNew();
await Task.WhenAll(observerGroup.Select(orderedObserver => CallOnStart(observerGroup.Key, orderedObserver, ct)));
await Task.WhenAll(observerGroup.Select(orderedObserver => CallOnStart(orderedObserver, ct)));
stopWatch.Stop();
this.PerfMeasureOnStart(stage, stopWatch.Elapsed);

Expand All @@ -111,14 +111,21 @@ public virtual async Task OnStart(CancellationToken ct)
throw;
}

async Task CallOnStart(int stage, OrderedObserver observer, CancellationToken cancellationToken)
static Task CallOnStart(OrderedObserver observer, CancellationToken cancellationToken)
{
await observer.Observer.OnStart(cancellationToken);
try
{
return observer.Observer?.OnStart(cancellationToken) ?? Task.CompletedTask;
}
catch (Exception ex)
{
return Task.FromException(ex);
}
}
}

protected virtual void OnStartStageCompleted(int stage) { }

protected virtual void PerfMeasureOnStop(int stage, TimeSpan elapsed)
{
if (this.logger != null && this.logger.IsEnabled(LogLevel.Trace))
Expand All @@ -137,10 +144,10 @@ public virtual async Task OnStop(CancellationToken ct)
if (!this.highStage.HasValue) return;
var loggedCancellation = false;
foreach (IGrouping<int, OrderedObserver> observerGroup in this.subscribers
// include up to highest started stage
.Where(orderedObserver => orderedObserver.Stage <= highStage)
.GroupBy(orderedObserver => orderedObserver.Stage)
.OrderByDescending(group => group.Key)
// skip all until we hit the highest started stage
.SkipWhile(group => !this.highStage.Equals(group.Key)))
.OrderByDescending(group => group.Key))
{
if (ct.IsCancellationRequested && !loggedCancellation)
{
Expand All @@ -153,7 +160,7 @@ public virtual async Task OnStop(CancellationToken ct)
try
{
var stopwatch = ValueStopwatch.StartNew();
await Task.WhenAll(observerGroup.Select(orderedObserver => CallOnStop(observerGroup.Key, orderedObserver, ct)));
await Task.WhenAll(observerGroup.Select(orderedObserver => CallOnStop(orderedObserver, ct)));
stopwatch.Stop();
this.PerfMeasureOnStop(stage, stopwatch.Elapsed);
}
Expand All @@ -167,9 +174,16 @@ public virtual async Task OnStop(CancellationToken ct)
this.OnStopStageCompleted(stage);
}

async Task CallOnStop(int stage, OrderedObserver observer, CancellationToken cancellationToken)
static Task CallOnStop(OrderedObserver observer, CancellationToken cancellationToken)
{
await observer.Observer.OnStop(cancellationToken);
try
{
return observer.Observer?.OnStop(cancellationToken) ?? Task.CompletedTask;
}
catch (Exception ex)
{
return Task.FromException(ex);
}
}
}

Expand All @@ -182,34 +196,21 @@ public virtual IDisposable Subscribe(string observerName, int stage, ILifecycleO

var orderedObserver = new OrderedObserver(stage, observer);
this.subscribers.Add(orderedObserver);
return new Disposable(() => this.subscribers.Remove(orderedObserver));
return orderedObserver;
}

private class Disposable : IDisposable
private class OrderedObserver : IDisposable
{
private readonly Action dispose;

public Disposable(Action dispose)
{
this.dispose = dispose;
}

public void Dispose()
{
this.dispose();
}
}

private class OrderedObserver
{
public ILifecycleObserver Observer { get; }
public ILifecycleObserver Observer { get; private set; }
public int Stage { get; }

public OrderedObserver(int stage, ILifecycleObserver observer)
{
this.Stage = stage;
this.Observer = observer;
}

public void Dispose() => Observer = null;
}
}
}

0 comments on commit c294977

Please sign in to comment.