Skip to content

Commit

Permalink
Merge branch 'feat/sqlclientx' into cheena/tdsparser-tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
cheenamalhotra authored Aug 7, 2024
2 parents 24adedd + 295867b commit 6a8ab66
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ internal sealed class PoolingDataSource : SqlDataSource
/// </summary>
private readonly ChannelReader<SqlConnector?> _idleConnectorReader;
private readonly ChannelWriter<SqlConnector?> _idleConnectorWriter;

private ValueTask _warmupTask;
private CancellationTokenSource _warmupCTS;
private readonly SemaphoreSlim _warmupLock;
#endregion

// Counts the total number of open connectors tracked by the pool.
Expand All @@ -65,11 +69,12 @@ internal sealed class PoolingDataSource : SqlDataSource
/// Initializes a new PoolingDataSource.
/// </summary>
//TODO: support auth contexts and provider info
internal PoolingDataSource(SqlConnectionStringBuilder connectionStringBuilder,
internal PoolingDataSource(
SqlConnectionString connectionString,
SqlCredential credential,
DbConnectionPoolGroupOptions options,
RateLimiterBase connectionRateLimiter)
: base(connectionStringBuilder, credential)
: base(connectionString, credential)
{
_connectionPoolGroupOptions = options;
_connectionRateLimiter = connectionRateLimiter;
Expand All @@ -83,6 +88,10 @@ internal PoolingDataSource(SqlConnectionStringBuilder connectionStringBuilder,
_idleConnectorWriter = idleChannel.Writer;

//TODO: initiate idle lifetime and pruning fields

_warmupTask = ValueTask.CompletedTask;
_warmupCTS = new CancellationTokenSource();
_warmupLock = new SemaphoreSlim(1);
}

#region properties
Expand Down Expand Up @@ -192,7 +201,7 @@ internal override async ValueTask<SqlConnector> GetInternalConnection(SqlConnect
return connector;
}
}
}
}
finally
{
//TODO: log error
Expand Down Expand Up @@ -273,7 +282,7 @@ private void CloseConnector(SqlConnector connector)
}


int i;
int i;
for (i = 0; i < MaxPoolSize; i++)
{
if (Interlocked.CompareExchange(ref _connectors[i], null, connector) == connector)
Expand All @@ -300,40 +309,44 @@ private void CloseConnector(SqlConnector connector)

// Only turn off the timer one time, when it was this Close that brought Open back to _min.
//TODO: pruning

// Ensure that we return to min pool size if closing this connector brought us below min pool size.
_ = WarmUp();
}

/// <summary>
/// A state object used to pass context to the rate limited connector creation operation.
/// </summary>
internal readonly struct OpenInternalConnectionState
{
internal readonly SqlConnectionX _owningConnection;
internal readonly TimeSpan _timeout;

internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan timeout)
{
_owningConnection = owningConnection;
_timeout = timeout;
}
internal PoolingDataSource Pool { get; init; }
internal SqlConnectionX? OwningConnection { get; init; }
internal TimeSpan Timeout { get; init; }
}

/// <inheritdoc/>
internal override ValueTask<SqlConnector?> OpenNewInternalConnection(SqlConnectionX owningConnection, TimeSpan timeout, bool async, CancellationToken cancellationToken)
internal override ValueTask<SqlConnector?> OpenNewInternalConnection(SqlConnectionX? owningConnection, TimeSpan timeout, bool async, CancellationToken cancellationToken)
{
return _connectionRateLimiter.Execute(
RateLimitedOpen,
new OpenInternalConnectionState(owningConnection, timeout),
new OpenInternalConnectionState
{
Pool = this,
OwningConnection = owningConnection,
Timeout = timeout
},
async,
cancellationToken
);

async ValueTask<SqlConnector?> RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken)

static async ValueTask<SqlConnector?> RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken)
{
// As long as we're under max capacity, attempt to increase the connector count and open a new connection.
for (var numConnectors = _numConnectors; numConnectors < MaxPoolSize; numConnectors = _numConnectors)
for (var numConnectors = state.Pool._numConnectors; numConnectors < state.Pool.MaxPoolSize; numConnectors = state.Pool._numConnectors)
{
// Note that we purposefully don't use SpinWait for this: https://github.com/dotnet/coreclr/pull/21437
if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors)
if (Interlocked.CompareExchange(ref state.Pool._numConnectors, numConnectors + 1, numConnectors) != numConnectors)
{
continue;
}
Expand All @@ -342,25 +355,25 @@ internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan t
{
// We've managed to increase the open counter, open a physical connection.
var startTime = Stopwatch.GetTimestamp();
SqlConnector? connector = new SqlConnector(state._owningConnection, this);
SqlConnector? connector = new SqlConnector(state.OwningConnection, state.Pool);
//TODO: set clear counter on connector

await connector.Open(timeout, async, cancellationToken).ConfigureAwait(false);
await connector.Open(state.Timeout, async, cancellationToken).ConfigureAwait(false);

int i;
for (i = 0; i < MaxPoolSize; i++)
for (i = 0; i < state.Pool.MaxPoolSize; i++)
{
if (Interlocked.CompareExchange(ref _connectors[i], connector, null) == null)
if (Interlocked.CompareExchange(ref state.Pool._connectors[i], connector, null) == null)
{
break;
}
}

Debug.Assert(i < MaxPoolSize, $"Could not find free slot in {_connectors} when opening.");
if (i == MaxPoolSize)
Debug.Assert(i < state.Pool.MaxPoolSize, $"Could not find free slot in {state.Pool._connectors} when opening.");
if (i == state.Pool.MaxPoolSize)
{
//TODO: generic exception?
throw new Exception($"Could not find free slot in {_connectors} when opening. Please report a bug.");
throw new Exception($"Could not find free slot in {state.Pool._connectors} when opening. Please report a bug.");
}

// Only start pruning if we've incremented open count past _min.
Expand All @@ -375,12 +388,12 @@ internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan t
catch
{
// Physical open failed, decrement the open and busy counter back down.
Interlocked.Decrement(ref _numConnectors);
Interlocked.Decrement(ref state.Pool._numConnectors);

// In case there's a waiting attempt on the channel, we write a null to the idle connector channel
// to wake it up, so it will try opening (and probably throw immediately)
// Statement order is important since we have synchronous completions on the channel.
_idleConnectorWriter.TryWrite(null);
state.Pool._idleConnectorWriter.TryWrite(null);

// Just in case we always call UpdatePruningTimer for failed physical open
//TODO: UpdatePruningTimer();
Expand Down Expand Up @@ -425,12 +438,67 @@ internal void PruneIdleConnections()
}

/// <summary>
/// Warms up the pool to bring it up to min pool size.
/// Warms up the pool by bringing it up to min pool size.
/// </summary>
/// <exception cref="NotImplementedException"></exception>
internal void WarmUp()
/// <returns>A ValueTask containing a ValueTask that represents the warmup process.</returns>
internal async ValueTask<ValueTask> WarmUp()
{
throw new NotImplementedException();
// Avoid semaphore wait if task is still running
if (!_warmupTask.IsCompleted)
{
return _warmupTask;
}

// Prevent multiple threads from modifying the warmup task
await _warmupLock.WaitAsync();

try
{
// The task may have been started by another thread while we were
// waiting on the semaphore
if (_warmupTask.IsCompleted)
{
_warmupTask = _WarmUp(_warmupCTS.Token);
}
}
finally
{
_warmupLock.Release();
}

return _warmupTask;

async ValueTask _WarmUp(CancellationToken ct)
{
// Best effort, we may over or under create due to race conditions.
// Open new connections slowly. If many connections are needed immediately
// upon pool creation they can always be created via user-initiated requests as fast
// as a parallel, pool-initiated approach could.
while (_numConnectors < MinPoolSize)
{
ct.ThrowIfCancellationRequested();

// Obey the same rate limit as user-initiated opens.
// Ensures that pool-initiated opens are queued properly alongside user requests.
SqlConnector? connector = await OpenNewInternalConnection(
null,
TimeSpan.FromSeconds(Settings.ConnectTimeout),
true,
ct)
.ConfigureAwait(false);

// If connector is null, then we hit the max pool size and can stop
// warming up the pool.
if (connector == null)
{
return;
}

// The connector has never been used, so it's safe to immediately return it to the
// pool without resetting it.
ReturnInternalConnection(connector);
}
}
}

/// <summary>
Expand All @@ -439,6 +507,8 @@ internal void WarmUp()
internal void Shutdown()
{
SqlClientEventSource.Log.TryPoolerTraceEvent("<prov.DbConnectionPool.Shutdown|RES|INFO|CPOOL> {0}", ObjectID);
_warmupCTS.Dispose();
_warmupLock.Dispose();
_connectionRateLimiter?.Dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal sealed class SqlConnector
private readonly TdsParserX _parser;
private readonly ConnectionHandlerContext _connectionHandlerContext;

internal SqlConnector(SqlConnectionX owningConnection, SqlConnectionString connectionOptions, SqlDataSource dataSource)
internal SqlConnector(SqlConnectionX? owningConnection, SqlConnectionString connectionOptions, SqlDataSource dataSource)
{
OwningConnection = owningConnection;
ConnectionOptions = connectionOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

using System;
using System.Data.Common;
using System.Security.AccessControl;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
Expand All @@ -22,32 +21,43 @@ namespace Microsoft.Data.SqlClientX
/// </summary>
internal abstract class SqlDataSource : DbDataSource
{
#region private
private readonly SqlConnectionStringBuilder _connectionStringBuilder;
private protected volatile int _isDisposed;
#endregion

#region constructors
/// <summary>
/// Initializes a new instance of SqlDataSource.
/// </summary>
/// <param name="connectionStringBuilder">The connection string that connections produced by this data source should use.</param>
/// <param name="connectionString">The connection string that connections produced by this data source should use.</param>
/// <param name="credential">The credentials that connections produced by this data source should use.</param>
internal SqlDataSource(
SqlConnectionStringBuilder connectionStringBuilder,
SqlConnectionString connectionString,
SqlCredential credential)
{
_connectionStringBuilder = connectionStringBuilder;
Settings = connectionString;
var hidePassword = !connectionString.PersistSecurityInfo;
ConnectionString = connectionString.UsersConnectionString(hidePassword);
Credential = credential;
}
#endregion

#region properties
/// <inheritdoc />
public override string ConnectionString => _connectionStringBuilder.ConnectionString;
public override string ConnectionString { get; }

/// <summary>
/// Stores settings for the data source.
/// Do not expose publicly.
/// </summary>
internal SqlConnectionString Settings { get; }

/// <summary>
/// Credentials to use for new connections created by this data source.
/// </summary>
internal SqlCredential Credential { get; }

/// <summary>
/// Gives pool statistics for total, idle, and busy connections.
/// </summary>
internal abstract (int Total, int Idle, int Busy) Statistics { get; }
#endregion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal sealed class SqlDataSourceBuilder
/// <summary>
/// A connection string builder that can be used to configure the connection string on the builder.
/// </summary>
public SqlConnectionStringBuilder ConnectionStringBuilder { get; }
public SqlConnectionString ConnectionString { get; }

// TODO: how does it interact with credentials specified in ConnectionStringBuilder?
public SqlCredential Credential { get; set; }
Expand All @@ -44,7 +44,7 @@ public SqlDataSourceBuilder(string connectionString = null, SqlCredential creden
/// </summary>
public SqlDataSourceBuilder(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential sqlCredential = null)
{
ConnectionStringBuilder = connectionStringBuilder;
ConnectionString = new SqlConnectionString(connectionStringBuilder.ConnectionString);
Credential = sqlCredential;
}

Expand All @@ -53,44 +53,44 @@ public SqlDataSourceBuilder(SqlConnectionStringBuilder connectionStringBuilder,
/// </summary>
public SqlDataSource Build()
{
if (ConnectionStringBuilder.Pooling)
if (ConnectionString.Pooling)
{
//TODO: pool group layer

DbConnectionPoolGroupOptions poolGroupOptions = new DbConnectionPoolGroupOptions(
ConnectionStringBuilder.IntegratedSecurity,
ConnectionStringBuilder.MinPoolSize,
ConnectionStringBuilder.MaxPoolSize,
ConnectionString.IntegratedSecurity,
ConnectionString.MinPoolSize,
ConnectionString.MaxPoolSize,
//TODO: carry over connect timeout conversion logic from SqlConnectionFactory? if not, don't need an extra allocation for this object, just use connection string builder
ConnectionStringBuilder.ConnectTimeout,
ConnectionStringBuilder.LoadBalanceTimeout,
ConnectionStringBuilder.Enlist);
ConnectionString.ConnectTimeout,
ConnectionString.LoadBalanceTimeout,
ConnectionString.Enlist);

//TODO: evaluate app context switch for concurrency limit
RateLimiterBase rateLimiter = IsBlockingPeriodEnabled() ? new BlockingPeriodRateLimiter() : new PassthroughRateLimiter();

return new PoolingDataSource(ConnectionStringBuilder,
return new PoolingDataSource(ConnectionString,
Credential,
poolGroupOptions,
rateLimiter);
}
else
{
return new UnpooledDataSource(
ConnectionStringBuilder,
ConnectionString,
Credential);
}
}

private bool IsBlockingPeriodEnabled()
{
var policy = ConnectionStringBuilder.PoolBlockingPeriod;
var policy = ConnectionString.PoolBlockingPeriod;

switch (policy)
{
case PoolBlockingPeriod.Auto:
{
if (ADP.IsAzureSqlServerEndpoint(ConnectionStringBuilder.DataSource))
if (ADP.IsAzureSqlServerEndpoint(ConnectionString.DataSource))
{
return false; // in Azure it will be Disabled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ internal sealed class UnpooledDataSource : SqlDataSource
/// <summary>
/// Initializes a new instance of UnpooledDataSource.
/// </summary>
/// <param name="connectionStringBuilder"></param>
/// <param name="connectionString"></param>
/// <param name="credential"></param>
internal UnpooledDataSource(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential credential) :
base(connectionStringBuilder, credential)
internal UnpooledDataSource(SqlConnectionString connectionString, SqlCredential credential) :
base(connectionString, credential)
{
}

Expand Down
Loading

0 comments on commit 6a8ab66

Please sign in to comment.