Skip to content

Commit

Permalink
Send status messages for long-running and blocked requests (#6672) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond authored Aug 18, 2020
1 parent e466b07 commit 6eec48d
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 36 deletions.
2 changes: 2 additions & 0 deletions src/Orleans.Core/Configuration/Options/MessagingOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public TimeSpan ResponseTimeout
get { return Debugger.IsAttached ? ResponseTimeoutWithDebugger : responseTimeout; }
set { this.responseTimeout = value; }
}

public static readonly TimeSpan DEFAULT_RESPONSE_TIMEOUT = TimeSpan.FromSeconds(30);
private TimeSpan responseTimeout = DEFAULT_RESPONSE_TIMEOUT;

Expand Down Expand Up @@ -83,6 +84,7 @@ public TimeSpan ResponseTimeout
/// if the body size is greater than this value.
/// </summary>
public int MaxMessageBodySize { get; set; } = DEFAULT_MAX_MESSAGE_BODY_SIZE;

public const int DEFAULT_MAX_MESSAGE_BODY_SIZE = 104857600; // 100MB
}
}
7 changes: 6 additions & 1 deletion src/Orleans.Core/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public enum ResponseTypes
{
Success,
Error,
Rejection
Rejection,
Status
}

public enum RejectionTypes
Expand Down Expand Up @@ -449,6 +450,10 @@ public override string ToString()
response = string.Format("{0} Rejection (info: {1}) ", RejectionType, RejectionInfo);
break;

case ResponseTypes.Status:
response = "Status ";
break;

default:
break;
}
Expand Down
12 changes: 12 additions & 0 deletions src/Orleans.Core/Messaging/MessageFactory.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Orleans.CodeGeneration;
using Orleans.Serialization;
Expand Down Expand Up @@ -146,5 +147,16 @@ public Message CreateRejectionResponse(Message request, Message.RejectionTypes t
if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.Debug("Creating {0} rejection with info '{1}' for {2} at:" + Environment.NewLine + "{3}", type, info, this, Utils.GetStackTrace());
return response;
}

internal Message CreateDiagnosticResponseMessage(Message request, bool isExecuting, bool isWaiting, List<string> diagnostics)
{
var response = this.CreateResponseMessage(request);
response.Result = Message.ResponseTypes.Status;
response.BodyObject = new StatusResponse(isExecuting, isWaiting, diagnostics);

if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("Creating {RequestMesssage} status update with diagnostics {Diagnostics}", request, diagnostics);

return response;
}
}
}
30 changes: 30 additions & 0 deletions src/Orleans.Core/Messaging/StatusResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Text;
using Orleans.Concurrency;

namespace Orleans.Runtime
{
[Immutable]
[Serializable]
internal class StatusResponse
{
private readonly uint _statusFlags;

public StatusResponse(bool isExecuting, bool isWaiting, List<string> diagnostics)
{
if (isExecuting) _statusFlags |= 0x1;
if (isWaiting) _statusFlags |= 0x2;

Diagnostics = diagnostics;
}

public List<string> Diagnostics { get; }

public bool IsExecuting => (_statusFlags & 0x1) != 0;

public bool IsWaiting => (_statusFlags & 0x2) != 0;

public override string ToString() => $"IsExecuting: {IsExecuting}, IsWaiting: {IsWaiting}, Diagnostics: [{string.Join(", ", this.Diagnostics)}]";
}
}
15 changes: 12 additions & 3 deletions src/Orleans.Core/Runtime/CallbackData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal class CallbackData
private readonly SharedCallbackData shared;
private readonly TaskCompletionSource<object> context;
private int completed;
private StatusResponse lastKnownStatus;
private ValueStopwatch stopwatch;

public CallbackData(
Expand All @@ -30,9 +31,15 @@ public CallbackData(

public bool IsCompleted => this.completed == 1;

public void OnStatusUpdate(StatusResponse status)
{
this.lastKnownStatus = status;
}

public bool IsExpired(long currentTimestamp)
{
return currentTimestamp - this.stopwatch.GetRawTimestamp() > this.shared.ResponseTimeoutStopwatchTicks;
var duration = currentTimestamp - this.stopwatch.GetRawTimestamp();
return duration > shared.ResponseTimeoutStopwatchTicks;
}

public void OnTimeout(TimeSpan timeout)
Expand All @@ -45,7 +52,8 @@ public void OnTimeout(TimeSpan timeout)
var msg = this.Message; // Local working copy

string messageHistory = msg.GetTargetHistory();
string errorMsg = $"Response did not arrive on time in {timeout} for message: {msg}. Target History is: {messageHistory}.";
var statusMessage = lastKnownStatus is StatusResponse status ? $"Last known status is {status}. " : string.Empty;
string errorMsg = $"Response did not arrive on time in {timeout} for message: {msg}. {statusMessage}Target History is: {messageHistory}.";
this.shared.Logger.Warn(ErrorCode.Runtime_Error_100157, "{0} About to break its promise.", errorMsg);

var error = Message.CreatePromptExceptionResponse(msg, new TimeoutException(errorMsg));
Expand All @@ -59,8 +67,9 @@ public void OnTargetSiloFail()
OrleansCallBackDataEvent.Log.OnTargetSiloFail(this.Message);
var msg = this.Message;
var messageHistory = msg.GetTargetHistory();
var statusMessage = lastKnownStatus is StatusResponse status ? $"Last known status is {status}. " : string.Empty;
string errorMsg =
$"The target silo became unavailable for message: {msg}. Target History is: {messageHistory}. See {Constants.TroubleshootingHelpLink} for troubleshooting help.";
$"The target silo became unavailable for message: {msg}. {statusMessage}Target History is: {messageHistory}. See {Constants.TroubleshootingHelpLink} for troubleshooting help.";
this.shared.Logger.Warn(ErrorCode.Runtime_Error_100157, "{0} About to break its promise.", errorMsg);

var error = Message.CreatePromptExceptionResponse(msg, new SiloUnavailableException(errorMsg));
Expand Down
32 changes: 32 additions & 0 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,38 @@ public void ReceiveResponse(Message response)
{
return;
}
else if (response.Result == Message.ResponseTypes.Status)
{
var status = (StatusResponse)response.BodyObject;
callbacks.TryGetValue(response.Id, out var callback);
var request = callback?.Message;
if (!(request is null))
{
callback.OnStatusUpdate(status);

if (status.Diagnostics != null && status.Diagnostics.Count > 0 && logger.IsEnabled(LogLevel.Information))
{
var diagnosticsString = string.Join("\n", status.Diagnostics);
using (request.SetThreadActivityId())
{
this.logger.LogInformation("Received status update for pending request, Request: {RequestMessage}. Status: {Diagnostics}", request, diagnosticsString);
}
}
}
else
{
if (status.Diagnostics != null && status.Diagnostics.Count > 0 && logger.IsEnabled(LogLevel.Information))
{
var diagnosticsString = string.Join("\n", status.Diagnostics);
using (response.SetThreadActivityId())
{
this.logger.LogInformation("Received status update for unknown request. Message: {StatusMessage}. Status: {Diagnostics}", response, diagnosticsString);
}
}
}

return;
}

CallbackData callbackData;
var found = callbacks.TryRemove(response.Id, out callbackData);
Expand Down
25 changes: 25 additions & 0 deletions src/Orleans.Core/Utils/ReferenceEqualsComparer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,29 @@ public override int GetHashCode(object obj)
return obj == null ? 0 : RuntimeHelpers.GetHashCode(obj);
}
}

internal class ReferenceEqualsComparer<T> : EqualityComparer<T> where T : class
{
/// <summary>
/// Gets an instance of this class.
/// </summary>
public static ReferenceEqualsComparer<T> Instance { get; } = new ReferenceEqualsComparer<T>();

/// <summary>
/// Defines object equality by reference equality (eq, in LISP).
/// </summary>
/// <returns>
/// true if the specified objects are equal; otherwise, false.
/// </returns>
/// <param name="x">The first object to compare.</param><param name="y">The second object to compare.</param>
public override bool Equals(T x, T y)
{
return object.ReferenceEquals(x, y);
}

public override int GetHashCode(T obj)
{
return obj == null ? 0 : RuntimeHelpers.GetHashCode(obj);
}
}
}
Loading

0 comments on commit 6eec48d

Please sign in to comment.