Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Merge from dev
Browse files Browse the repository at this point in the history
  • Loading branch information
makam committed May 14, 2018
2 parents b44265e + ebfe7eb commit 6db9dd7
Show file tree
Hide file tree
Showing 57 changed files with 1,946 additions and 493 deletions.
46 changes: 40 additions & 6 deletions build/build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ $configuration = if ($CONFIGURATION -ne $null) { $CONFIGURATION } else { 'Debug
$platform = if ($PLATFORM -ne $null) { $PLATFORM } else { 'Any CPU' }
$projectFolder = if ($ENV:APPVEYOR_BUILD_FOLDER -ne $null) { "$ENV:APPVEYOR_BUILD_FOLDER" } else { $(Get-Location).path }
$buildFolder = $projectFolder + '\build\'
$runtime = if ($ENV:DotNetRunTime -ne $null) { $ENV:DotNetRunTime } else { 'netcoreapp1.0' }
$runtime = if ($ENV:DotNetRunTime -ne $null) { $ENV:DotNetRunTime } else { 'netcoreapp2.0' }
$artifactsFolder = $buildFolder + 'artifacts\'
$appProject = $projectFolder + '\src\Microsoft.Azure.ServiceBus\Microsoft.Azure.ServiceBus.csproj'
$testProject = $projectFolder + '\test\Microsoft.Azure.ServiceBus.UnitTests\Microsoft.Azure.ServiceBus.UnitTests.csproj'
Expand Down Expand Up @@ -157,10 +157,22 @@ function Run-UnitTests
}
if ([bool]$codeCovSecret)
{
$ENV:PATH = 'C:\\Python34;C:\\Python34\\Scripts;' + $ENV:PATH
python -m pip install --upgrade pip
pip install git+git://github.com/codecov/codecov-python.git
codecov -f $coverageFile -t $codeCovSecret -X gcov
try
{
$ENV:PATH = 'C:\\Python34;C:\\Python34\\Scripts;' + $ENV:PATH
python -m pip install --upgrade pip
pip install git+git://github.com/codecov/codecov-python.git
codecov -f $coverageFile -t $codeCovSecret -X gcov

#choco install codecov
#codecov.exe -f $coverageFile -t $codeCovSecret -X gcov
}
catch
{
$error | Format-List *
$_ |select -expandproperty invocationinfo
Write-Host -ForegroundColor Red "Codecov failed"
}
}
else
{
Expand Down Expand Up @@ -200,13 +212,35 @@ function Delete-AzureResources
Write-Host "Completed deleting Azure resources"
}

function Cleanup-EnvironmentVariables
{
Write-Host "Cleaning Environment variables"
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/CodeCovSecret', ' ', "Machine")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/ClientSecret', ' ', "Machine")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/TenantId', ' ', "Machine")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/AppId', ' ', "Machine")

[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/CodeCovSecret', ' ', "User")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/ClientSecret', ' ', "User")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/TenantId', ' ', "User")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/AppId', ' ', "User")

[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/CodeCovSecret', ' ', "Process")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/ClientSecret', ' ', "Process")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/TenantId', ' ', "Process")
[Environment]::SetEnvironmentVariable('azure-service-bus-dotnet/AppId', ' ', "Process")
}

Cleanup-EnvironmentVariables
Build-Solution
if (-Not $canDeploy -and -Not [bool][Environment]::GetEnvironmentVariable($connectionStringVariableName)) {
Write-Host "Build exiting. CanDeploy: " + $canDeploy
return
}
try {
if ($canDeploy -and -not [bool][Environment]::GetEnvironmentVariable($connectionStringVariableName)) {
Deploy-AzureResources

}
if ([bool][Environment]::GetEnvironmentVariable($connectionStringVariableName)) {
Run-UnitTests
Expand All @@ -220,4 +254,4 @@ finally {
if ($canDeploy -and $resourceGroupName) {
Delete-AzureResources
}
}
}
46 changes: 28 additions & 18 deletions src/Microsoft.Azure.ServiceBus/Amqp/ActiveClientLinkManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus.Amqp
{
using Azure.Amqp;
using Primitives;
using System;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -12,6 +13,7 @@ sealed class ActiveClientLinkManager
{
static readonly TimeSpan SendTokenTimeout = TimeSpan.FromMinutes(1);
static readonly TimeSpan TokenRefreshBuffer = TimeSpan.FromSeconds(10);
static readonly TimeSpan MaxTokenRefreshTime = TimeSpan.FromDays(30);

readonly string clientId;
readonly RetryPolicy retryPolicy;
Expand Down Expand Up @@ -81,25 +83,31 @@ async Task RenewCbsTokenAsync(ActiveClientLinkObject activeClientLinkObject)
try
{
var cbsLink = activeClientLinkObject.Connection.Extensions.Find<AmqpCbsLink>() ?? new AmqpCbsLink(activeClientLinkObject.Connection);

MessagingEventSource.Log.AmqpSendAuthenticationTokenStart(activeClientLinkObject.EndpointUri, activeClientLinkObject.Audience, activeClientLinkObject.Audience, activeClientLinkObject.RequiredClaims);

var renewTokenTask = this.retryPolicy.RunOperation(
async () =>
{
activeClientLinkObject.AuthorizationValidUntilUtc = await cbsLink.SendTokenAsync(
this.cbsTokenProvider,
activeClientLinkObject.EndpointUri,
activeClientLinkObject.Audience,
activeClientLinkObject.Audience,
activeClientLinkObject.RequiredClaims,
ActiveClientLinkManager.SendTokenTimeout).ConfigureAwait(false);
}, ActiveClientLinkManager.SendTokenTimeout);

await renewTokenTask.ConfigureAwait(false);
DateTime cbsTokenExpiresAtUtc = DateTime.MaxValue;

foreach (var resource in activeClientLinkObject.Audience)
{
MessagingEventSource.Log.AmqpSendAuthenticationTokenStart(activeClientLinkObject.EndpointUri, resource, resource, activeClientLinkObject.RequiredClaims);

await this.retryPolicy.RunOperation(
async () =>
{
cbsTokenExpiresAtUtc = TimeoutHelper.Min(
cbsTokenExpiresAtUtc,
await cbsLink.SendTokenAsync(
this.cbsTokenProvider,
activeClientLinkObject.EndpointUri,
resource,
resource,
activeClientLinkObject.RequiredClaims,
ActiveClientLinkManager.SendTokenTimeout).ConfigureAwait(false));
}, ActiveClientLinkManager.SendTokenTimeout).ConfigureAwait(false);

MessagingEventSource.Log.AmqpSendAuthenticationTokenStop();
}

activeClientLinkObject.AuthorizationValidUntilUtc = cbsTokenExpiresAtUtc;
this.SetRenewCbsTokenTimer(activeClientLinkObject);

MessagingEventSource.Log.AmqpSendAuthenticationTokenStop();
}
catch (Exception e)
{
Expand All @@ -123,6 +131,8 @@ void SetRenewCbsTokenTimer(ActiveClientLinkObject activeClientLinkObject)
}

var interval = activeClientLinkObject.AuthorizationValidUntilUtc.Subtract(DateTime.UtcNow) - ActiveClientLinkManager.TokenRefreshBuffer;
interval = TimeoutHelper.Min(interval, ActiveClientLinkManager.MaxTokenRefreshTime);

this.ChangeRenewTimer(activeClientLinkObject, interval);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.ServiceBus/Amqp/ActiveClientLinkObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract class ActiveClientLinkObject
{
readonly string[] requiredClaims;

protected ActiveClientLinkObject(AmqpObject amqpLinkObject, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
protected ActiveClientLinkObject(AmqpObject amqpLinkObject, Uri endpointUri, string[] audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
{
this.LinkObject = amqpLinkObject;
this.EndpointUri = endpointUri;
Expand All @@ -21,7 +21,7 @@ protected ActiveClientLinkObject(AmqpObject amqpLinkObject, Uri endpointUri, st

public AmqpObject LinkObject { get; }

public string Audience { get; }
public string[] Audience { get; }

public Uri EndpointUri { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp

sealed class ActiveRequestResponseLink : ActiveClientLinkObject
{
public ActiveRequestResponseLink(RequestResponseAmqpLink link, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
public ActiveRequestResponseLink(RequestResponseAmqpLink link, Uri endpointUri, string[] audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
: base(link, endpointUri, audience, requiredClaims, authorizationValidUntilUtc)
{
this.Link = link;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.Azure.ServiceBus.Amqp

sealed class ActiveSendReceiveClientLink : ActiveClientLinkObject
{
public ActiveSendReceiveClientLink(AmqpLink link, Uri endpointUri, string audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
public ActiveSendReceiveClientLink(AmqpLink link, Uri endpointUri, string[] audience, string[] requiredClaims, DateTime authorizationValidUntilUtc)
: base(link, endpointUri, audience, requiredClaims, authorizationValidUntilUtc)
{
this.Link = link;
Expand Down
1 change: 1 addition & 0 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpClientConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AmqpClientConstants
public static readonly AmqpSymbol AttachEpoch = AmqpConstants.Vendor + ":epoch";
public static readonly AmqpSymbol BatchFlushIntervalName = AmqpConstants.Vendor + ":batch-flush-interval";
public static readonly AmqpSymbol EntityTypeName = AmqpConstants.Vendor + ":entity-type";
public static readonly AmqpSymbol TransferDestinationAddress = AmqpConstants.Vendor + ":transfer-destination-address";
public static readonly AmqpSymbol TimeoutName = AmqpConstants.Vendor + ":timeout";
public static readonly AmqpSymbol TrackingIdName = AmqpConstants.Vendor + ":tracking-id";

Expand Down
16 changes: 14 additions & 2 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpExceptionHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static class AmqpExceptionHelper
{ AmqpClientConstants.PartitionNotOwnedError.Value, AmqpResponseStatusCode.Gone },
{ AmqpClientConstants.EntityDisabledError.Value, AmqpResponseStatusCode.BadRequest },
{ AmqpClientConstants.PublisherRevokedError.Value, AmqpResponseStatusCode.Unauthorized },
{ AmqpClientConstants.AuthorizationFailedError.Value, AmqpResponseStatusCode.Unauthorized},
{ AmqpErrorCode.Stolen.Value, AmqpResponseStatusCode.Gone }
};

Expand Down Expand Up @@ -115,9 +116,10 @@ static Exception ToMessagingContractException(string condition, string message,
return new InvalidOperationException(message);
}

if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value))
if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value) ||
string.Equals(condition, AmqpClientConstants.AuthorizationFailedError.Value))
{
return new UnauthorizedAccessException(message);
return new UnauthorizedException(message);
}

if (string.Equals(condition, AmqpClientConstants.ServerBusyError.Value))
Expand Down Expand Up @@ -160,6 +162,16 @@ static Exception ToMessagingContractException(string condition, string message,
return new MessageSizeExceededException(message);
}

if (string.Equals(condition, AmqpClientConstants.MessageNotFoundError.Value))
{
return new MessageNotFoundException(message);
}

if (string.Equals(condition, AmqpClientConstants.SessionCannotBeLockedError.Value))
{
return new SessionCannotBeLockedException(message);
}

return new ServiceBusException(true, message);
}

Expand Down
17 changes: 12 additions & 5 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpLinkCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ internal abstract class AmqpLinkCreator
readonly string entityPath;
readonly ServiceBusConnection serviceBusConnection;
readonly Uri endpointAddress;
readonly string[] audience;
readonly string[] requiredClaims;
readonly ICbsTokenProvider cbsTokenProvider;
readonly AmqpLinkSettings amqpLinkSettings;

protected AmqpLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings amqpLinkSettings, string clientId)
protected AmqpLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] audience, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings amqpLinkSettings, string clientId)
{
this.entityPath = entityPath;
this.serviceBusConnection = serviceBusConnection;
this.endpointAddress = endpointAddress;
this.audience = audience;
this.requiredClaims = requiredClaims;
this.cbsTokenProvider = cbsTokenProvider;
this.amqpLinkSettings = amqpLinkSettings;
Expand All @@ -41,11 +43,16 @@ public async Task<Tuple<AmqpObject, DateTime>> CreateAndOpenAmqpLinkAsync()

// Authenticate over CBS
var cbsLink = amqpConnection.Extensions.Find<AmqpCbsLink>();
DateTime cbsTokenExpiresAtUtc = DateTime.MaxValue;

var resource = this.endpointAddress.AbsoluteUri;
MessagingEventSource.Log.AmqpSendAuthenticationTokenStart(this.endpointAddress, resource, resource, this.requiredClaims);
var cbsTokenExpiresAtUtc = await cbsLink.SendTokenAsync(this.cbsTokenProvider, this.endpointAddress, resource, resource, this.requiredClaims, timeoutHelper.RemainingTime()).ConfigureAwait(false);
MessagingEventSource.Log.AmqpSendAuthenticationTokenStop();
foreach (var resource in this.audience)
{
MessagingEventSource.Log.AmqpSendAuthenticationTokenStart(this.endpointAddress, resource, resource, this.requiredClaims);
cbsTokenExpiresAtUtc = TimeoutHelper.Min(
cbsTokenExpiresAtUtc,
await cbsLink.SendTokenAsync(this.cbsTokenProvider, this.endpointAddress, resource, resource, this.requiredClaims, timeoutHelper.RemainingTime()).ConfigureAwait(false));
MessagingEventSource.Log.AmqpSendAuthenticationTokenStop();
}

AmqpSession session = null;
try
Expand Down
15 changes: 15 additions & 0 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ static class AmqpMessageConverter
const string PublisherName = "x-opt-publisher";
const string PartitionKeyName = "x-opt-partition-key";
const string PartitionIdName = "x-opt-partition-id";
const string ViaPartitionKeyName = "x-opt-via-partition-key";
const string DeadLetterSourceName = "x-opt-deadletter-source";
const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
const string UriName = AmqpConstants.Vendor + ":uri";
Expand Down Expand Up @@ -87,6 +88,12 @@ public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<SBMessage> sb
firstMessage.PartitionKey;
}

if (firstMessage.ViaPartitionKey != null)
{
amqpMessage.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] =
firstMessage.ViaPartitionKey;
}

amqpMessage.Batchable = batchable;
return amqpMessage;
}
Expand Down Expand Up @@ -129,6 +136,11 @@ public static AmqpMessage SBMessageToAmqpMessage(SBMessage sbMessage)
amqpMessage.MessageAnnotations.Map.Add(PartitionKeyName, sbMessage.PartitionKey);
}

if (sbMessage.ViaPartitionKey != null)
{
amqpMessage.MessageAnnotations.Map.Add(ViaPartitionKeyName, sbMessage.ViaPartitionKey);
}

if (sbMessage.UserProperties != null && sbMessage.UserProperties.Count > 0)
{
if (amqpMessage.ApplicationProperties == null)
Expand Down Expand Up @@ -305,6 +317,9 @@ public static SBMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage)
case PartitionIdName:
sbMessage.SystemProperties.PartitionId = (short)pair.Value;
break;
case ViaPartitionKeyName:
sbMessage.ViaPartitionKey = (string)pair.Value;
break;
case DeadLetterSourceName:
sbMessage.SystemProperties.DeadLetterSource = (string)pair.Value;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ internal class AmqpRequestResponseLinkCreator : AmqpLinkCreator
{
readonly string entityPath;

public AmqpRequestResponseLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, endpointAddress, requiredClaims, cbsTokenProvider, linkSettings, clientId)
public AmqpRequestResponseLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] audience, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, endpointAddress, audience, requiredClaims, cbsTokenProvider, linkSettings, clientId)
{
this.entityPath = entityPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ namespace Microsoft.Azure.ServiceBus.Amqp

internal class AmqpSendReceiveLinkCreator : AmqpLinkCreator
{
public AmqpSendReceiveLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, endpointAddress, requiredClaims, cbsTokenProvider, linkSettings, clientId)
public AmqpSendReceiveLinkCreator(string entityPath, ServiceBusConnection serviceBusConnection, Uri endpointAddress, string[] audience, string[] requiredClaims, ICbsTokenProvider cbsTokenProvider, AmqpLinkSettings linkSettings, string clientId)
: base(entityPath, serviceBusConnection, endpointAddress, audience, requiredClaims, cbsTokenProvider, linkSettings, clientId)
{
}

Expand Down
Loading

0 comments on commit 6db9dd7

Please sign in to comment.