diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs index 1a20125864..afd88e2d38 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/PoolingDataSource.cs @@ -53,6 +53,10 @@ internal sealed class PoolingDataSource : SqlDataSource /// private readonly ChannelReader _idleConnectorReader; private readonly ChannelWriter _idleConnectorWriter; + + private ValueTask _warmupTask; + private CancellationTokenSource _warmupCTS; + private readonly SemaphoreSlim _warmupLock; #endregion // Counts the total number of open connectors tracked by the pool. @@ -65,11 +69,12 @@ internal sealed class PoolingDataSource : SqlDataSource /// Initializes a new PoolingDataSource. /// //TODO: support auth contexts and provider info - internal PoolingDataSource(SqlConnectionStringBuilder connectionStringBuilder, + internal PoolingDataSource( + SqlConnectionString connectionString, SqlCredential credential, DbConnectionPoolGroupOptions options, RateLimiterBase connectionRateLimiter) - : base(connectionStringBuilder, credential) + : base(connectionString, credential) { _connectionPoolGroupOptions = options; _connectionRateLimiter = connectionRateLimiter; @@ -83,6 +88,10 @@ internal PoolingDataSource(SqlConnectionStringBuilder connectionStringBuilder, _idleConnectorWriter = idleChannel.Writer; //TODO: initiate idle lifetime and pruning fields + + _warmupTask = ValueTask.CompletedTask; + _warmupCTS = new CancellationTokenSource(); + _warmupLock = new SemaphoreSlim(1); } #region properties @@ -192,7 +201,7 @@ internal override async ValueTask GetInternalConnection(SqlConnect return connector; } } - } + } finally { //TODO: log error @@ -273,7 +282,7 @@ private void CloseConnector(SqlConnector connector) } - int i; + int i; for (i = 0; i < MaxPoolSize; i++) { if (Interlocked.CompareExchange(ref _connectors[i], null, connector) == connector) @@ -300,6 +309,9 @@ private void CloseConnector(SqlConnector connector) // Only turn off the timer one time, when it was this Close that brought Open back to _min. //TODO: pruning + + // Ensure that we return to min pool size if closing this connector brought us below min pool size. + _ = WarmUp(); } /// @@ -307,33 +319,34 @@ private void CloseConnector(SqlConnector connector) /// internal readonly struct OpenInternalConnectionState { - internal readonly SqlConnectionX _owningConnection; - internal readonly TimeSpan _timeout; - - internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan timeout) - { - _owningConnection = owningConnection; - _timeout = timeout; - } + internal PoolingDataSource Pool { get; init; } + internal SqlConnectionX? OwningConnection { get; init; } + internal TimeSpan Timeout { get; init; } } /// - internal override ValueTask OpenNewInternalConnection(SqlConnectionX owningConnection, TimeSpan timeout, bool async, CancellationToken cancellationToken) + internal override ValueTask OpenNewInternalConnection(SqlConnectionX? owningConnection, TimeSpan timeout, bool async, CancellationToken cancellationToken) { return _connectionRateLimiter.Execute( RateLimitedOpen, - new OpenInternalConnectionState(owningConnection, timeout), + new OpenInternalConnectionState + { + Pool = this, + OwningConnection = owningConnection, + Timeout = timeout + }, async, cancellationToken ); - async ValueTask RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken) + + static async ValueTask RateLimitedOpen(OpenInternalConnectionState state, bool async, CancellationToken cancellationToken) { // As long as we're under max capacity, attempt to increase the connector count and open a new connection. - for (var numConnectors = _numConnectors; numConnectors < MaxPoolSize; numConnectors = _numConnectors) + for (var numConnectors = state.Pool._numConnectors; numConnectors < state.Pool.MaxPoolSize; numConnectors = state.Pool._numConnectors) { // Note that we purposefully don't use SpinWait for this: https://github.com/dotnet/coreclr/pull/21437 - if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors) + if (Interlocked.CompareExchange(ref state.Pool._numConnectors, numConnectors + 1, numConnectors) != numConnectors) { continue; } @@ -342,25 +355,25 @@ internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan t { // We've managed to increase the open counter, open a physical connection. var startTime = Stopwatch.GetTimestamp(); - SqlConnector? connector = new SqlConnector(state._owningConnection, this); + SqlConnector? connector = new SqlConnector(state.OwningConnection, state.Pool); //TODO: set clear counter on connector - await connector.Open(timeout, async, cancellationToken).ConfigureAwait(false); + await connector.Open(state.Timeout, async, cancellationToken).ConfigureAwait(false); int i; - for (i = 0; i < MaxPoolSize; i++) + for (i = 0; i < state.Pool.MaxPoolSize; i++) { - if (Interlocked.CompareExchange(ref _connectors[i], connector, null) == null) + if (Interlocked.CompareExchange(ref state.Pool._connectors[i], connector, null) == null) { break; } } - Debug.Assert(i < MaxPoolSize, $"Could not find free slot in {_connectors} when opening."); - if (i == MaxPoolSize) + Debug.Assert(i < state.Pool.MaxPoolSize, $"Could not find free slot in {state.Pool._connectors} when opening."); + if (i == state.Pool.MaxPoolSize) { //TODO: generic exception? - throw new Exception($"Could not find free slot in {_connectors} when opening. Please report a bug."); + throw new Exception($"Could not find free slot in {state.Pool._connectors} when opening. Please report a bug."); } // Only start pruning if we've incremented open count past _min. @@ -375,12 +388,12 @@ internal OpenInternalConnectionState(SqlConnectionX owningConnection, TimeSpan t catch { // Physical open failed, decrement the open and busy counter back down. - Interlocked.Decrement(ref _numConnectors); + Interlocked.Decrement(ref state.Pool._numConnectors); // In case there's a waiting attempt on the channel, we write a null to the idle connector channel // to wake it up, so it will try opening (and probably throw immediately) // Statement order is important since we have synchronous completions on the channel. - _idleConnectorWriter.TryWrite(null); + state.Pool._idleConnectorWriter.TryWrite(null); // Just in case we always call UpdatePruningTimer for failed physical open //TODO: UpdatePruningTimer(); @@ -425,12 +438,67 @@ internal void PruneIdleConnections() } /// - /// Warms up the pool to bring it up to min pool size. + /// Warms up the pool by bringing it up to min pool size. /// - /// - internal void WarmUp() + /// A ValueTask containing a ValueTask that represents the warmup process. + internal async ValueTask WarmUp() { - throw new NotImplementedException(); + // Avoid semaphore wait if task is still running + if (!_warmupTask.IsCompleted) + { + return _warmupTask; + } + + // Prevent multiple threads from modifying the warmup task + await _warmupLock.WaitAsync(); + + try + { + // The task may have been started by another thread while we were + // waiting on the semaphore + if (_warmupTask.IsCompleted) + { + _warmupTask = _WarmUp(_warmupCTS.Token); + } + } + finally + { + _warmupLock.Release(); + } + + return _warmupTask; + + async ValueTask _WarmUp(CancellationToken ct) + { + // Best effort, we may over or under create due to race conditions. + // Open new connections slowly. If many connections are needed immediately + // upon pool creation they can always be created via user-initiated requests as fast + // as a parallel, pool-initiated approach could. + while (_numConnectors < MinPoolSize) + { + ct.ThrowIfCancellationRequested(); + + // Obey the same rate limit as user-initiated opens. + // Ensures that pool-initiated opens are queued properly alongside user requests. + SqlConnector? connector = await OpenNewInternalConnection( + null, + TimeSpan.FromSeconds(Settings.ConnectTimeout), + true, + ct) + .ConfigureAwait(false); + + // If connector is null, then we hit the max pool size and can stop + // warming up the pool. + if (connector == null) + { + return; + } + + // The connector has never been used, so it's safe to immediately return it to the + // pool without resetting it. + ReturnInternalConnection(connector); + } + } } /// @@ -439,6 +507,8 @@ internal void WarmUp() internal void Shutdown() { SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}", ObjectID); + _warmupCTS.Dispose(); + _warmupLock.Dispose(); _connectionRateLimiter?.Dispose(); } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlConnector.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlConnector.cs index d38b03ab79..5286aab962 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlConnector.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlConnector.cs @@ -28,7 +28,7 @@ internal sealed class SqlConnector private readonly TdsParserX _parser; private readonly ConnectionHandlerContext _connectionHandlerContext; - internal SqlConnector(SqlConnectionX owningConnection, SqlConnectionString connectionOptions, SqlDataSource dataSource) + internal SqlConnector(SqlConnectionX? owningConnection, SqlConnectionString connectionOptions, SqlDataSource dataSource) { OwningConnection = owningConnection; ConnectionOptions = connectionOptions; diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs index fc8aaa2df9..3d0cf70e14 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSource.cs @@ -7,7 +7,6 @@ using System; using System.Data.Common; -using System.Security.AccessControl; using System.Threading; using System.Threading.Tasks; using Microsoft.Data.SqlClient; @@ -22,32 +21,43 @@ namespace Microsoft.Data.SqlClientX /// internal abstract class SqlDataSource : DbDataSource { - #region private - private readonly SqlConnectionStringBuilder _connectionStringBuilder; private protected volatile int _isDisposed; - #endregion #region constructors /// /// Initializes a new instance of SqlDataSource. /// - /// The connection string that connections produced by this data source should use. + /// The connection string that connections produced by this data source should use. /// The credentials that connections produced by this data source should use. internal SqlDataSource( - SqlConnectionStringBuilder connectionStringBuilder, + SqlConnectionString connectionString, SqlCredential credential) { - _connectionStringBuilder = connectionStringBuilder; + Settings = connectionString; + var hidePassword = !connectionString.PersistSecurityInfo; + ConnectionString = connectionString.UsersConnectionString(hidePassword); Credential = credential; } #endregion #region properties /// - public override string ConnectionString => _connectionStringBuilder.ConnectionString; + public override string ConnectionString { get; } + + /// + /// Stores settings for the data source. + /// Do not expose publicly. + /// + internal SqlConnectionString Settings { get; } + /// + /// Credentials to use for new connections created by this data source. + /// internal SqlCredential Credential { get; } + /// + /// Gives pool statistics for total, idle, and busy connections. + /// internal abstract (int Total, int Idle, int Busy) Statistics { get; } #endregion diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs index a2a22bbdaa..6aa1b43186 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/SqlDataSourceBuilder.cs @@ -23,7 +23,7 @@ internal sealed class SqlDataSourceBuilder /// /// A connection string builder that can be used to configure the connection string on the builder. /// - public SqlConnectionStringBuilder ConnectionStringBuilder { get; } + public SqlConnectionString ConnectionString { get; } // TODO: how does it interact with credentials specified in ConnectionStringBuilder? public SqlCredential Credential { get; set; } @@ -44,7 +44,7 @@ public SqlDataSourceBuilder(string connectionString = null, SqlCredential creden /// public SqlDataSourceBuilder(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential sqlCredential = null) { - ConnectionStringBuilder = connectionStringBuilder; + ConnectionString = new SqlConnectionString(connectionStringBuilder.ConnectionString); Credential = sqlCredential; } @@ -53,23 +53,23 @@ public SqlDataSourceBuilder(SqlConnectionStringBuilder connectionStringBuilder, /// public SqlDataSource Build() { - if (ConnectionStringBuilder.Pooling) + if (ConnectionString.Pooling) { //TODO: pool group layer DbConnectionPoolGroupOptions poolGroupOptions = new DbConnectionPoolGroupOptions( - ConnectionStringBuilder.IntegratedSecurity, - ConnectionStringBuilder.MinPoolSize, - ConnectionStringBuilder.MaxPoolSize, + ConnectionString.IntegratedSecurity, + ConnectionString.MinPoolSize, + ConnectionString.MaxPoolSize, //TODO: carry over connect timeout conversion logic from SqlConnectionFactory? if not, don't need an extra allocation for this object, just use connection string builder - ConnectionStringBuilder.ConnectTimeout, - ConnectionStringBuilder.LoadBalanceTimeout, - ConnectionStringBuilder.Enlist); + ConnectionString.ConnectTimeout, + ConnectionString.LoadBalanceTimeout, + ConnectionString.Enlist); //TODO: evaluate app context switch for concurrency limit RateLimiterBase rateLimiter = IsBlockingPeriodEnabled() ? new BlockingPeriodRateLimiter() : new PassthroughRateLimiter(); - return new PoolingDataSource(ConnectionStringBuilder, + return new PoolingDataSource(ConnectionString, Credential, poolGroupOptions, rateLimiter); @@ -77,20 +77,20 @@ public SqlDataSource Build() else { return new UnpooledDataSource( - ConnectionStringBuilder, + ConnectionString, Credential); } } private bool IsBlockingPeriodEnabled() { - var policy = ConnectionStringBuilder.PoolBlockingPeriod; + var policy = ConnectionString.PoolBlockingPeriod; switch (policy) { case PoolBlockingPeriod.Auto: { - if (ADP.IsAzureSqlServerEndpoint(ConnectionStringBuilder.DataSource)) + if (ADP.IsAzureSqlServerEndpoint(ConnectionString.DataSource)) { return false; // in Azure it will be Disabled } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs index 0c1694faeb..c7f772860f 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClientX/UnpooledDataSource.cs @@ -23,10 +23,10 @@ internal sealed class UnpooledDataSource : SqlDataSource /// /// Initializes a new instance of UnpooledDataSource. /// - /// + /// /// - internal UnpooledDataSource(SqlConnectionStringBuilder connectionStringBuilder, SqlCredential credential) : - base(connectionStringBuilder, credential) + internal UnpooledDataSource(SqlConnectionString connectionString, SqlCredential credential) : + base(connectionString, credential) { } diff --git a/src/Microsoft.Data.SqlClient/tests/UnitTests/PoolTests.cs b/src/Microsoft.Data.SqlClient/tests/UnitTests/PoolTests.cs index acc9d16d6a..58f33c03e7 100644 --- a/src/Microsoft.Data.SqlClient/tests/UnitTests/PoolTests.cs +++ b/src/Microsoft.Data.SqlClient/tests/UnitTests/PoolTests.cs @@ -85,7 +85,7 @@ public async Task Get_connector_from_exhausted_pool(bool async) else conn2.Open(); } - + [Theory] [InlineData(true)] [InlineData(false)] @@ -113,7 +113,7 @@ public async Task Timeout_getting_connector_from_exhausted_pool(bool async) // conn1 should now be back in the pool as idle await using var conn3 = await dataSource.OpenConnectionAsync(); } - + [Fact] //[Explicit("Timing-based")] public async Task OpenAsync_cancel() @@ -459,6 +459,53 @@ public async Task ConnectionLifetime() Assert.That(conn.ProcessID, Is.Not.EqualTo(processId)); } */ + + [Theory] + [InlineData(0)] + [InlineData(1)] + [InlineData(5)] + public async Task Warmup_OpenedConnectionsEqualsMinPoolSize(int minPoolSize) + { + // Arrange + await using var dataSource = (PoolingDataSource)testBase.CreateDataSource(csb => csb.MinPoolSize = minPoolSize); + + // Act + await await dataSource.WarmUp(); + + // Assert + Assert.Equal(minPoolSize, dataSource.Statistics.Total); + Assert.Equal(minPoolSize, dataSource.Statistics.Idle); + } + + [Fact] + public async Task Warmup_ConcurrentWarmupCalls_OnlyOneExecuted() + { + // Arrange + await using var dataSource = (PoolingDataSource)testBase.CreateDataSource(csb => csb.MinPoolSize = 10); + + // Act + ValueTask t1 = await dataSource.WarmUp(); + ValueTask t2 = await dataSource.WarmUp(); + + // Assert + Assert.Equal(t1, t2); + } + + [Fact] + public async Task Warmup_SequentialWarmupCalls_BothExecuted() + { + // Arrange + await using var dataSource = (PoolingDataSource)testBase.CreateDataSource(csb => csb.MinPoolSize = 10); + + // Act + ValueTask t1 = await dataSource.WarmUp(); + await t1; + ValueTask t2 = await dataSource.WarmUp(); + + // Assert + Assert.NotEqual(t1, t2); + } + #region Support volatile int StopFlag;