Skip to content

Commit

Permalink
-Added new explicit CopyTableDataAsync() APIs which enable explicit c…
Browse files Browse the repository at this point in the history
…opying of data between two tables on matching columns (automatically detected by Name and Data Type).

-Added new Materialized Data Configuration value MaterializedDataLoadingTableDataCopyMode to control whether the materialized data process automatically copies data into the Loading Tables after cloning. This helps to greatly simplify new use cases where data must be merged (and preserved) during the materialization process.
  • Loading branch information
cajuncoding committed Sep 11, 2023
1 parent 0d9de38 commit cac2c35
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 43 deletions.
16 changes: 9 additions & 7 deletions NetStandard.SqlBulkHelpers/MaterializedData/CloneTableInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ public readonly struct CloneTableInfo
{
public TableNameTerm SourceTable { get; }
public TableNameTerm TargetTable { get; }
public bool CopyDataFromSource { get; }

public CloneTableInfo(TableNameTerm sourceTable, TableNameTerm? targetTable = null)
public CloneTableInfo(TableNameTerm sourceTable, TableNameTerm? targetTable = null, bool copyDataFromSource = false)
{
sourceTable.AssertArgumentIsNotNull(nameof(sourceTable));

Expand All @@ -20,6 +21,7 @@ public CloneTableInfo(TableNameTerm sourceTable, TableNameTerm? targetTable = nu

SourceTable = sourceTable;
TargetTable = validTargetTable;
CopyDataFromSource = copyDataFromSource;
}

/// <summary>
Expand All @@ -32,7 +34,7 @@ public CloneTableInfo MakeTargetTableNameUnique()
private static TableNameTerm MakeTableNameUniqueInternal(TableNameTerm tableNameTerm)
=> TableNameTerm.From(tableNameTerm.SchemaName, string.Concat(tableNameTerm.TableName, "_", IdGenerator.NewId(10)));

public static CloneTableInfo From<TSource, TTarget>(string sourceTableName = null, string targetTableName = null, string targetPrefix = null, string targetSuffix = null)
public static CloneTableInfo From<TSource, TTarget>(string sourceTableName = null, string targetTableName = null, string targetPrefix = null, string targetSuffix = null, bool copyDataFromSource = false)
{
//If the generic type is ISkipMappingLookup then we must have a valid sourceTableName specified as a param...
if (SqlBulkHelpersProcessingDefinition.SkipMappingLookupType.IsAssignableFrom(typeof(TSource)))
Expand All @@ -55,13 +57,13 @@ public static CloneTableInfo From<TSource, TTarget>(string sourceTableName = nul
}

var targetTable = TableNameTerm.From<TTarget>(targetTableName ?? sourceTableName).ApplyNamePrefixOrSuffix(targetPrefix, targetSuffix);
return new CloneTableInfo(sourceTable, targetTable);
return new CloneTableInfo(sourceTable, targetTable, copyDataFromSource);
}

public static CloneTableInfo From(string sourceTableName, string targetTableName = null, string targetPrefix = null, string targetSuffix = null)
=> From<ISkipMappingLookup, ISkipMappingLookup>(sourceTableName, targetTableName, targetPrefix, targetSuffix);
public static CloneTableInfo From(string sourceTableName, string targetTableName = null, string targetPrefix = null, string targetSuffix = null, bool copyDataFromSource = false)
=> From<ISkipMappingLookup, ISkipMappingLookup>(sourceTableName, targetTableName, targetPrefix, targetSuffix, copyDataFromSource);

public static CloneTableInfo ForNewSchema(TableNameTerm sourceTable, string targetSchemaName, string targetTablePrefix = null, string targetTableSuffix = null)
=> new CloneTableInfo(sourceTable, sourceTable.SwitchSchema(targetSchemaName).ApplyNamePrefixOrSuffix(targetTablePrefix, targetTableSuffix));
public static CloneTableInfo ForNewSchema(TableNameTerm sourceTable, string targetSchemaName, string targetTablePrefix = null, string targetTableSuffix = null, bool copyDataFromSource = false)
=> new CloneTableInfo(sourceTable, sourceTable.SwitchSchema(targetSchemaName).ApplyNamePrefixOrSuffix(targetTablePrefix, targetTableSuffix), copyDataFromSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ protected async Task<MaterializationTableInfo[]> CloneTableStructuresForMaterial
originalTableNameTerm,
loadingTablesSchema,
BulkHelpersConfig.MaterializedDataLoadingTablePrefix,
BulkHelpersConfig.MaterializedDataLoadingTableSuffix
BulkHelpersConfig.MaterializedDataLoadingTableSuffix,
//For the Loading Table specify if CopyDataFromSource is enabled based on our configuration...
copyDataFromSource: BulkHelpersConfig.MaterializedDataLoadingTableDataCopyMode == DataCopyMode.CopySourceData
);

//Add Clones for Discarding tables (used for switching Live OUT for later cleanup)...
Expand Down Expand Up @@ -208,7 +210,6 @@ await CloneTablesInternalAsync(
sqlTransaction,
cloneInfoToExecuteList,
recreateIfExists: true,
copyDataFromSource: false,
includeFKeyConstraints: false
).ConfigureAwait(false);

Expand Down Expand Up @@ -279,14 +280,14 @@ public async Task<CloneTableInfo> CloneTableAsync(
string targetTableName = null,
bool recreateIfExists = false,
bool copyDataFromSource = false
) => (await CloneTablesAsync(sqlTransaction, tablesToClone: new[] { CloneTableInfo.From<T, T>(sourceTableName, targetTableName) }, recreateIfExists).ConfigureAwait(false)).FirstOrDefault();
) => (await CloneTablesAsync(sqlTransaction, tablesToClone: new[] { CloneTableInfo.From<T, T>(sourceTableName, targetTableName) }, recreateIfExists, copyDataFromSource).ConfigureAwait(false)).FirstOrDefault();

public Task<CloneTableInfo[]> CloneTablesAsync(
SqlTransaction sqlTransaction,
bool recreateIfExists,
bool copyDataFromSource,
params CloneTableInfo[] tablesToClone
) => CloneTablesAsync(sqlTransaction, tablesToClone, recreateIfExists, copyDataFromSource);
) => CloneTablesAsync(sqlTransaction, tablesToClone.ToList(), recreateIfExists, copyDataFromSource);

public Task<CloneTableInfo[]> CloneTablesAsync(
SqlTransaction sqlTransaction,
Expand All @@ -301,7 +302,6 @@ protected async Task<CloneTableInfo[]> CloneTablesInternalAsync(
SqlTransaction sqlTransaction,
IEnumerable<CloneTableInfo> tablesToClone,
bool recreateIfExists = false,
bool copyDataFromSource = false,
bool includeFKeyConstraints = false
)
{
Expand Down Expand Up @@ -334,7 +334,7 @@ protected async Task<CloneTableInfo[]> CloneTablesInternalAsync(
recreateIfExists ? IfExists.Recreate : IfExists.StopProcessingWithException,
cloneIdentitySeedValue: BulkHelpersConfig.IsCloningIdentitySeedValueEnabled,
includeFKeyConstraints: includeFKeyConstraints,
copyDataFromSource: copyDataFromSource
copyDataFromSource: cloneInfo.CopyDataFromSource
);

////TODO: Might (potentially if it doesn't impede performance too much) implement support for re-mapping FKey constraints to Materialization Context tables so data integrity issues will be caught sooner
Expand All @@ -356,6 +356,74 @@ await sqlTransaction

#endregion

#region Copy Table Data API Methods

public async Task<CloneTableInfo> CopyTableDataAsync(
SqlTransaction sqlTransaction,
string sourceTableName = null,
string targetTableName = null
) => (await CopyTableDataAsync(sqlTransaction, tablesToProcess: new[] { CloneTableInfo.From<T, T>(sourceTableName, targetTableName, copyDataFromSource: true) }).ConfigureAwait(false)).FirstOrDefault();

public Task<CloneTableInfo[]> CopyTableDataAsync(
SqlTransaction sqlTransaction,
params CloneTableInfo[] tablesToProcess
) => CopyTableDataAsync(sqlTransaction, tablesToProcess.ToList());

public Task<CloneTableInfo[]> CopyTableDataAsync(
SqlTransaction sqlTransaction,
IEnumerable<CloneTableInfo> tablesToProcess
) => CopyTableDataInternalAsync(sqlTransaction, tablesToProcess);

//Internal method with additional flags for normal cloning & materialized data cloning
//NOTE: Materialization process requires special handling such as No FKeys being added to Temp/Loading Tables until ready to Switch
protected async Task<CloneTableInfo[]> CopyTableDataInternalAsync(
SqlTransaction sqlTransaction,
IEnumerable<CloneTableInfo> tablesToClone
)
{
sqlTransaction.AssertArgumentIsNotNull(nameof(sqlTransaction));

var cloneInfoList = tablesToClone.ToList();

if (cloneInfoList.IsNullOrEmpty())
throw new ArgumentException("At least one source & target table pair must be specified.");

var sqlScriptBuilder = MaterializedDataScriptBuilder.NewSqlScript();
var cloneInfoResults = new List<CloneTableInfo>();
foreach (var cloneInfo in cloneInfoList)
{
var sourceTable = cloneInfo.SourceTable;
var targetTable = cloneInfo.TargetTable;

//If both Source & Target are the same (e.g. Target was not explicitly specified) then we adjust
// the Target to ensure we create a copy and append a unique Copy Id...
if (targetTable.EqualsIgnoreCase(sourceTable))
throw new InvalidOperationException($"The source table name {sourceTable.FullyQualifiedTableName} and target table name {targetTable.FullyQualifiedTableName} must be different.");

var sourceTableSchemaDefinition = await GetTableSchemaDefinitionInternalAsync(TableSchemaDetailLevel.BasicDetails, sqlTransaction.Connection, sqlTransaction, sourceTable);
if (sourceTableSchemaDefinition == null)
throw new ArgumentException($"Could not resolve the source table schema for {sourceTable.FullyQualifiedTableName} on the provided connection.");

var targetTableSchemaDefinition = await GetTableSchemaDefinitionInternalAsync(TableSchemaDetailLevel.BasicDetails, sqlTransaction.Connection, sqlTransaction, targetTable);
if (targetTableSchemaDefinition == null)
throw new ArgumentException($"Could not resolve the target table schema for {sourceTable.FullyQualifiedTableName} on the provided connection.");

sqlScriptBuilder.CopyTableData(sourceTableSchemaDefinition, targetTableSchemaDefinition);

cloneInfoResults.Add(new CloneTableInfo(sourceTable, targetTable));
}

//Execute the Script!
await sqlTransaction
.ExecuteMaterializedDataSqlScriptAsync(sqlScriptBuilder, BulkHelpersConfig.MaterializeDataStructureProcessingTimeoutSeconds)
.ConfigureAwait(false);

//If everything was successful then we can simply return the input values as they were all cloned...
return cloneInfoResults.AsArray();
}

#endregion

#region Drop Table API Methods

public async Task<TableNameTerm> DropTableAsync(SqlTransaction sqlTransaction, string tableNameOverride = null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public MaterializedDataScriptBuilder CloneTableWithAllElements(
CloneTableWithColumnsOnly(sourceTableDefinition.TableNameTerm, targetTable, ifExists);

if (copyDataFromSource)
CopyTableData(sourceTableDefinition, targetTable);
CopyTableDataForClone(sourceTableDefinition, targetTable);

//GET the Seed Value immediately, since there is a small chance of it changing...
if (cloneIdentitySeedValue && sourceTableDefinition.IdentityColumn != null)
Expand All @@ -106,54 +106,84 @@ public MaterializedDataScriptBuilder CloneTableWithAllElements(
return this;
}

public MaterializedDataScriptBuilder CopyTableData(SqlBulkHelpersTableDefinition sourceTableDefinition, TableNameTerm targetTable)
public MaterializedDataScriptBuilder CopyTableData(SqlBulkHelpersTableDefinition sourceTableDefinition, SqlBulkHelpersTableDefinition targetTableDefinition)
{
sourceTableDefinition.AssertArgumentIsNotNull(nameof(sourceTableDefinition));
targetTable.AssertArgumentIsNotNull(nameof(targetTable));
targetTableDefinition.AssertArgumentIsNotNull(nameof(targetTableDefinition));

bool hasIdentityColumn = sourceTableDefinition.IdentityColumn != null;
var targetColLookup = targetTableDefinition.TableColumns.ToLookup(c => new { c.ColumnName, c.DataType });

//In this overload we handle the Source Table Definition and can dynamically determine if there is An Identity column we handle by enabling insertion for them...
if (hasIdentityColumn)
{
ScriptBuilder.Append($@"
--The Table {sourceTableDefinition.TableFullyQualifiedName} has an Identity Column {sourceTableDefinition.IdentityColumn.ColumnName.QualifySqlTerm()} so we must allow Insertion of IDENTITY values to copy raw table data...
SET IDENTITY_INSERT {targetTable.FullyQualifiedTableName} ON;
");
}
var matchingColumnDefs = sourceTableDefinition.TableColumns
.Where(c => targetColLookup.Contains(new { c.ColumnName, c.DataType }))
.ToArray();

//Now we can Copy data between the two tables...
CopyTableData(sourceTableDefinition.TableNameTerm, targetTable, sourceTableDefinition.TableColumns.AsArray());
if (!matchingColumnDefs.Any())
throw new ArgumentException("There are no matching column definitions between the source & target table schema definitions provided; there must be at least one column matching on name & data type.");

//In this overload we handle the Source Table Definition and can dynamically determine if there is An Identity column we handle by enabling insertion for them...
if (hasIdentityColumn)
{
ScriptBuilder.Append($@"
--We now disable IDENTITY Inserts once all data is copied into {targetTable}...
SET IDENTITY_INSERT {targetTable.FullyQualifiedTableName} OFF;
");
}
//Now we can Copy data between the two tables on the matching columns detected and enable Identity Insert if the
// Target has an Identity Column which might be explicitly copied...
CopyTableData(
sourceTable: sourceTableDefinition.TableNameTerm,
targetTable: targetTableDefinition.TableNameTerm,
enableIdentityInsertOnTarget: targetTableDefinition.IdentityColumn != null,
matchingColumnDefs
);

return this;
}

public MaterializedDataScriptBuilder CopyTableDataForClone(SqlBulkHelpersTableDefinition sourceTableDefinition, TableNameTerm targetTable, params TableColumnDefinition[] columnDefs)
{
sourceTableDefinition.AssertArgumentIsNotNull(nameof(sourceTableDefinition));
targetTable.AssertArgumentIsNotNull(nameof(targetTable));

//Now we can Copy data between the two tables and enable Identity Insert if the Source has an Identity Column (which was cloned)...
CopyTableData(
sourceTableDefinition.TableNameTerm,
targetTable,
enableIdentityInsertOnTarget: sourceTableDefinition.IdentityColumn != null,
sourceTableDefinition.TableColumns.AsArray()
);

return this;
}

public MaterializedDataScriptBuilder CopyTableData(TableNameTerm sourceTable, TableNameTerm targetTable, params TableColumnDefinition[] columnDefs)
public MaterializedDataScriptBuilder CopyTableData(TableNameTerm sourceTable, TableNameTerm targetTable, bool enableIdentityInsertOnTarget, params TableColumnDefinition[] columnDefs)
{
sourceTable.AssertArgumentIsNotNull(nameof(sourceTable));
targetTable.AssertArgumentIsNotNull(nameof(targetTable));

//Validate that we have Table Columns...
if (!columnDefs.HasAny())
throw new ArgumentException($"At least one valid column definition must be specified to denote what data to copy between tables.");
throw new ArgumentException("At least one valid column definition must be specified to denote what data to copy between tables.");

var columnNamesCsv = columnDefs.Select(c => c.ColumnName.QualifySqlTerm()).ToCsv();

//In this overload we handle the Source Table Definition and can dynamically determine if there is An Identity column we handle by enabling insertion for them...
if (enableIdentityInsertOnTarget)
{
ScriptBuilder.Append($@"
--The Table {targetTable.FullyQualifiedTableName} has an Identity Column so we must allow Insertion of IDENTITY values to copy raw table data...
SET IDENTITY_INSERT {targetTable.FullyQualifiedTableName} ON;
");
}

ScriptBuilder.Append($@"
--Syncs the Identity Seed value of the Target Table with the current value of the Source Table (captured into Variable at top of script)
INSERT INTO {targetTable.FullyQualifiedTableName} ({columnNamesCsv})
SELECT {columnNamesCsv}
FROM {sourceTable.FullyQualifiedTableName};
");

//In this overload we handle the Source Table Definition and can dynamically determine if there is An Identity column we handle by enabling insertion for them...
if (enableIdentityInsertOnTarget)
{
ScriptBuilder.Append($@"
--We now disable IDENTITY Inserts once all data is copied into {targetTable.FullyQualifiedTableName}...
SET IDENTITY_INSERT {targetTable.FullyQualifiedTableName} OFF;
");
}

return this;
}

Expand Down
Loading

0 comments on commit cac2c35

Please sign in to comment.