From 821e01723ed07630563736fbc3e39db08f13ae48 Mon Sep 17 00:00:00 2001 From: Michael Pendon Date: Fri, 2 Oct 2020 11:32:35 +0200 Subject: [PATCH] #343 CancellationToken Support for SQL Server BulkDelete --- .../BaseRepository/BulkDelete.cs | 22 ++- .../DbRepository/BulkDelete.cs | 57 +++++-- .../SqlConnection/BulkDelete.cs | 152 +++++++++++------- .../BaseRepository/BulkDelete.cs | 22 ++- .../DbRepository/BulkDelete.cs | 57 +++++-- .../SqlConnection/BulkDelete.cs | 152 +++++++++++------- 6 files changed, 300 insertions(+), 162 deletions(-) diff --git a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/BaseRepository/BulkDelete.cs b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/BaseRepository/BulkDelete.cs index 2c243bb7d..f69069b99 100644 --- a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/BaseRepository/BulkDelete.cs +++ b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/BaseRepository/BulkDelete.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq.Expressions; +using System.Threading; using System.Threading.Tasks; namespace RepoDb @@ -126,20 +127,23 @@ public static int BulkDelete(this BaseRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static Task BulkDeleteAsync(this BaseRepository repository, IEnumerable primaryKeys, string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return repository.DbRepository.BulkDeleteAsync(primaryKeys: primaryKeys, hints: hints, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -155,6 +159,7 @@ public static Task BulkDeleteAsync(this BaseRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static Task BulkDeleteAsync(this BaseRepository repository, IEnumerable entities, @@ -164,7 +169,8 @@ public static Task BulkDeleteAsync(this BaseRepository(entities: entities, @@ -174,7 +180,8 @@ public static Task BulkDeleteAsync(this BaseRepository @@ -191,6 +198,7 @@ public static Task BulkDeleteAsync(this BaseRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static Task BulkDeleteAsync(this BaseRepository repository, string tableName, @@ -201,7 +209,8 @@ public static Task BulkDeleteAsync(this BaseRepository(tableName: tableName, @@ -212,7 +221,8 @@ public static Task BulkDeleteAsync(this BaseRepository repository, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, IEnumerable primaryKeys, string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { // Create a connection @@ -520,7 +523,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, IEnumerable entities, @@ -562,7 +567,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -623,7 +631,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, DbDataReader reader, @@ -683,7 +694,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, DataTable dataTable, @@ -744,7 +758,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -803,7 +820,8 @@ public static async Task BulkDeleteAsync(this DbRepository r string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Create a connection var connection = (transaction?.Connection ?? repository.CreateConnection()); @@ -817,7 +835,8 @@ public static async Task BulkDeleteAsync(this DbRepository r bulkCopyTimeout: repository.CommandTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } catch { @@ -850,6 +869,7 @@ public static async Task BulkDeleteAsync(this DbRepository r /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -860,7 +880,8 @@ public static async Task BulkDeleteAsync(this DbRepository r string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Create a connection var connection = (transaction?.Connection ?? repository.CreateConnection()); @@ -877,7 +898,8 @@ public static async Task BulkDeleteAsync(this DbRepository r bulkCopyTimeout: repository.CommandTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } catch { @@ -911,6 +933,7 @@ public static async Task BulkDeleteAsync(this DbRepository r /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -922,7 +945,8 @@ public static async Task BulkDeleteAsync(this DbRepository r string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Create a connection var connection = (transaction?.Connection ?? repository.CreateConnection()); @@ -940,7 +964,8 @@ public static async Task BulkDeleteAsync(this DbRepository r bulkCopyTimeout: repository.CommandTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } catch { diff --git a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/SqlConnection/BulkDelete.cs b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/SqlConnection/BulkDelete.cs index 2287dc39a..8b1c49e01 100644 --- a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/SqlConnection/BulkDelete.cs +++ b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/Microsoft.Data.SqlClient/SqlConnection/BulkDelete.cs @@ -6,6 +6,7 @@ using System.Data.Common; using System.Linq; using System.Linq.Expressions; +using System.Threading; using System.Threading.Tasks; namespace RepoDb @@ -354,6 +355,7 @@ public static int BulkDelete(this SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, IEnumerable primaryKeys, @@ -361,7 +363,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return await BulkDeleteAsyncInternal(connection: connection, @@ -371,7 +374,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -388,6 +392,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, IEnumerable entities, @@ -398,7 +403,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { using (var reader = new DataEntityDataReader(entities)) @@ -414,7 +420,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } } @@ -433,6 +440,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -444,7 +452,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { using (var reader = new DataEntityDataReader(entities)) @@ -460,7 +469,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } } @@ -478,6 +488,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, DbDataReader reader, @@ -488,7 +499,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return await BulkDeleteAsyncInternal(connection: connection, @@ -501,7 +513,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -519,6 +532,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, DataTable dataTable, @@ -530,7 +544,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return await BulkDeleteAsyncInternal(connection: connection, @@ -544,7 +559,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } #endregion @@ -562,6 +578,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -570,7 +587,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { return await BulkDeleteAsyncInternal(connection: connection, tableName: tableName, @@ -579,7 +597,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -596,6 +615,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -607,7 +627,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { return await BulkDeleteAsyncInternal(connection: connection, tableName: tableName, @@ -619,7 +640,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -637,6 +659,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -649,7 +672,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { return await BulkDeleteAsyncInternal(connection: connection, tableName: tableName, @@ -662,7 +686,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } #endregion @@ -1212,6 +1237,7 @@ internal static int BulkDeleteInternal(SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. internal static async Task BulkDeleteAsyncInternal(SqlConnection connection, string tableName, @@ -1220,7 +1246,8 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Variables var dbSetting = connection.GetDbSetting(); @@ -1231,7 +1258,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection if (transaction == null) { // Add the transaction if not present - transaction = (SqlTransaction)(await connection.EnsureOpenAsync()).BeginTransaction(); + transaction = (SqlTransaction)(await connection.EnsureOpenAsync(cancellationToken)).BeginTransaction(); } else { @@ -1254,7 +1281,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection try { // Get the DB Fields - var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true); + var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true, cancellationToken); // Variables needed var primaryDbField = dbFields?.FirstOrDefault(dbField => dbField.IsPrimary); @@ -1273,7 +1300,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection tempTableName, primaryOrIdentityField.AsEnumerable(), dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Do the bulk insertion first using (var dataTable = CreateDataTableWithSingleColumn(primaryOrIdentityField, primaryKeys)) @@ -1289,14 +1316,15 @@ await BulkInsertAsyncInternal(connection, options: options, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } // Create the clustered index sql = GetCreateTemporaryTableClusteredIndexSqlText(tempTableName, primaryOrIdentityField.AsEnumerable(), dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Delete the actual delete sql = GetBulkDeleteSqlText(tableName, @@ -1304,11 +1332,11 @@ await BulkInsertAsyncInternal(connection, primaryOrIdentityField.AsEnumerable(), hints, dbSetting); - result = connection.ExecuteNonQuery(sql, commandTimeout: bulkCopyTimeout, transaction: transaction); + result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction, cancellationToken: cancellationToken); // Drop the table after used sql = GetDropTemporaryTableSqlText(tempTableName, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Commit the transaction if (hasTransaction == false) @@ -1354,6 +1382,7 @@ await BulkInsertAsyncInternal(connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. internal static async Task BulkDeleteAsyncInternal(SqlConnection connection, string tableName, @@ -1365,7 +1394,8 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Variables var dbSetting = connection.GetDbSetting(); @@ -1376,7 +1406,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection if (transaction == null) { // Add the transaction if not present - transaction = (SqlTransaction)(await connection.EnsureOpenAsync()).BeginTransaction(); + transaction = (SqlTransaction)(await connection.EnsureOpenAsync(cancellationToken)).BeginTransaction(); } else { @@ -1399,7 +1429,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection try { // Get the DB Fields - var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true); + var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true, cancellationToken); // Variables needed var readerFields = Enumerable.Range(0, reader.FieldCount) @@ -1449,7 +1479,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection tempTableName, fields, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Set the options to KeepIdentity if needed if (options == null && identityDbField?.IsIdentity == true && @@ -1471,24 +1501,25 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection fields?.Any(field => string.Equals(field.Name, dbField.Name, StringComparison.OrdinalIgnoreCase)) == true); // Do the bulk insertion first - await BulkInsertAsyncInternal(connection, - tempTableName, - reader, - filteredDbFields, - mappings, - options, - null, + await BulkInsertAsyncInternal(connection: connection, + tableName: tempTableName, + reader: reader, + dbFields: filteredDbFields, + mappings: mappings, + options: options, + hints: null, bulkCopyTimeout, batchSize, - false, - false, - transaction); + isReturnIdentity: false, + usePhysicalPseudoTempTable: false, + transaction: transaction, + cancellationToken: cancellationToken); // Create the clustered index sql = GetCreateTemporaryTableClusteredIndexSqlText(tempTableName, qualifiers, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Delete the actual delete sql = GetBulkDeleteSqlText(tableName, @@ -1496,11 +1527,11 @@ await BulkInsertAsyncInternal(connection, qualifiers, hints, dbSetting); - result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction); + result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction, cancellationToken: cancellationToken); // Drop the table after used sql = GetDropTemporaryTableSqlText(tempTableName, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Commit the transaction if (hasTransaction == false) @@ -1547,6 +1578,7 @@ await BulkInsertAsyncInternal(connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. internal static async Task BulkDeleteAsyncInternal(SqlConnection connection, string tableName, @@ -1559,7 +1591,8 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Variables var dbSetting = connection.GetDbSetting(); @@ -1570,7 +1603,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection if (transaction == null) { // Add the transaction if not present - transaction = (SqlTransaction)(await connection.EnsureOpenAsync()).BeginTransaction(); + transaction = (SqlTransaction)(await connection.EnsureOpenAsync(cancellationToken)).BeginTransaction(); } else { @@ -1593,7 +1626,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection try { // Get the DB Fields - var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true); + var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true, cancellationToken); // Variables needed var tableFields = Enumerable.Range(0, dataTable.Columns.Count) @@ -1643,7 +1676,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection tempTableName, fields, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Set the options to KeepIdentity if needed if (options == null && identityDbField?.IsIdentity == true && @@ -1665,25 +1698,26 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection fields?.Any(field => string.Equals(field.Name, dbField.Name, StringComparison.OrdinalIgnoreCase)) == true); // Do the bulk insertion first - await BulkInsertAsyncInternal(connection, - tempTableName, - dataTable, - rowState, - filteredDbFields, - mappings, - options, - null, + await BulkInsertAsyncInternal(connection: connection, + tableName: tempTableName, + dataTable: dataTable, + rowState: rowState, + dbFields: filteredDbFields, + mappings: mappings, + options: options, + hints: null, bulkCopyTimeout, batchSize, - false, - false, - transaction); + isReturnIdentity: false, + usePhysicalPseudoTempTable: false, + transaction: transaction, + cancellationToken: cancellationToken); // Create the clustered index sql = GetCreateTemporaryTableClusteredIndexSqlText(tempTableName, qualifiers, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Delete the actual delete sql = GetBulkDeleteSqlText(tableName, @@ -1691,11 +1725,11 @@ await BulkInsertAsyncInternal(connection, qualifiers, hints, dbSetting); - result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction); + result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction, cancellationToken: cancellationToken); // Drop the table after used sql = GetDropTemporaryTableSqlText(tempTableName, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Commit the transaction if (hasTransaction == false) diff --git a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/BaseRepository/BulkDelete.cs b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/BaseRepository/BulkDelete.cs index 1e00be0da..c53c869e1 100644 --- a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/BaseRepository/BulkDelete.cs +++ b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/BaseRepository/BulkDelete.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Data.SqlClient; using System.Linq.Expressions; +using System.Threading; using System.Threading.Tasks; namespace RepoDb @@ -126,20 +127,23 @@ public static int BulkDelete(this BaseRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static Task BulkDeleteAsync(this BaseRepository repository, IEnumerable primaryKeys, string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return repository.DbRepository.BulkDeleteAsync(primaryKeys: primaryKeys, hints: hints, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -155,6 +159,7 @@ public static Task BulkDeleteAsync(this BaseRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static Task BulkDeleteAsync(this BaseRepository repository, IEnumerable entities, @@ -164,7 +169,8 @@ public static Task BulkDeleteAsync(this BaseRepository(entities: entities, @@ -174,7 +180,8 @@ public static Task BulkDeleteAsync(this BaseRepository @@ -191,6 +198,7 @@ public static Task BulkDeleteAsync(this BaseRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static Task BulkDeleteAsync(this BaseRepository repository, string tableName, @@ -201,7 +209,8 @@ public static Task BulkDeleteAsync(this BaseRepository(tableName: tableName, @@ -212,7 +221,8 @@ public static Task BulkDeleteAsync(this BaseRepository repository, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, IEnumerable primaryKeys, string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { // Create a connection @@ -520,7 +523,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, IEnumerable entities, @@ -562,7 +567,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -623,7 +631,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, DbDataReader reader, @@ -683,7 +694,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, DataTable dataTable, @@ -744,7 +758,8 @@ public static async Task BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepository BulkDeleteAsync(this DbRepositoryThe size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -803,7 +820,8 @@ public static async Task BulkDeleteAsync(this DbRepository r string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Create a connection var connection = (transaction?.Connection ?? repository.CreateConnection()); @@ -817,7 +835,8 @@ public static async Task BulkDeleteAsync(this DbRepository r bulkCopyTimeout: repository.CommandTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } catch { @@ -850,6 +869,7 @@ public static async Task BulkDeleteAsync(this DbRepository r /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -860,7 +880,8 @@ public static async Task BulkDeleteAsync(this DbRepository r string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Create a connection var connection = (transaction?.Connection ?? repository.CreateConnection()); @@ -877,7 +898,8 @@ public static async Task BulkDeleteAsync(this DbRepository r bulkCopyTimeout: repository.CommandTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } catch { @@ -911,6 +933,7 @@ public static async Task BulkDeleteAsync(this DbRepository r /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this DbRepository repository, string tableName, @@ -922,7 +945,8 @@ public static async Task BulkDeleteAsync(this DbRepository r string hints = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Create a connection var connection = (transaction?.Connection ?? repository.CreateConnection()); @@ -940,7 +964,8 @@ public static async Task BulkDeleteAsync(this DbRepository r bulkCopyTimeout: repository.CommandTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } catch { diff --git a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/SqlConnection/BulkDelete.cs b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/SqlConnection/BulkDelete.cs index 434ef56da..9debfe60a 100644 --- a/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/SqlConnection/BulkDelete.cs +++ b/RepoDb.Extensions/RepoDb.SqlServer.BulkOperations/RepoDb.SqlServer.BulkOperations/System.Data.SqlClient/SqlConnection/BulkDelete.cs @@ -6,6 +6,7 @@ using System.Data.SqlClient; using System.Linq; using System.Linq.Expressions; +using System.Threading; using System.Threading.Tasks; namespace RepoDb @@ -354,6 +355,7 @@ public static int BulkDelete(this SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, IEnumerable primaryKeys, @@ -361,7 +363,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return await BulkDeleteAsyncInternal(connection: connection, @@ -371,7 +374,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -388,6 +392,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, IEnumerable entities, @@ -398,7 +403,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { using (var reader = new DataEntityDataReader(entities)) @@ -414,7 +420,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } } @@ -433,6 +440,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -444,7 +452,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { using (var reader = new DataEntityDataReader(entities)) @@ -460,7 +469,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } } @@ -478,6 +488,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, DbDataReader reader, @@ -488,7 +499,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return await BulkDeleteAsyncInternal(connection: connection, @@ -501,7 +513,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -519,6 +532,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, DataTable dataTable, @@ -530,7 +544,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) where TEntity : class { return await BulkDeleteAsyncInternal(connection: connection, @@ -544,7 +559,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connec bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } #endregion @@ -562,6 +578,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connec /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -570,7 +587,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { return await BulkDeleteAsyncInternal(connection: connection, tableName: tableName, @@ -579,7 +597,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -596,6 +615,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -607,7 +627,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { return await BulkDeleteAsyncInternal(connection: connection, tableName: tableName, @@ -619,7 +640,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } /// @@ -637,6 +659,7 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. public static async Task BulkDeleteAsync(this SqlConnection connection, string tableName, @@ -649,7 +672,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { return await BulkDeleteAsyncInternal(connection: connection, tableName: tableName, @@ -662,7 +686,8 @@ public static async Task BulkDeleteAsync(this SqlConnection connection, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, usePhysicalPseudoTempTable: usePhysicalPseudoTempTable, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } #endregion @@ -1212,6 +1237,7 @@ internal static int BulkDeleteInternal(SqlConnection connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. internal static async Task BulkDeleteAsyncInternal(SqlConnection connection, string tableName, @@ -1220,7 +1246,8 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Variables var dbSetting = connection.GetDbSetting(); @@ -1231,7 +1258,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection if (transaction == null) { // Add the transaction if not present - transaction = (SqlTransaction)(await connection.EnsureOpenAsync()).BeginTransaction(); + transaction = (SqlTransaction)(await connection.EnsureOpenAsync(cancellationToken)).BeginTransaction(); } else { @@ -1254,7 +1281,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection try { // Get the DB Fields - var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true); + var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true, cancellationToken); // Variables needed var primaryDbField = dbFields?.FirstOrDefault(dbField => dbField.IsPrimary); @@ -1273,7 +1300,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection tempTableName, primaryOrIdentityField.AsEnumerable(), dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Do the bulk insertion first using (var dataTable = CreateDataTableWithSingleColumn(primaryOrIdentityField, primaryKeys)) @@ -1289,14 +1316,15 @@ await BulkInsertAsyncInternal(connection, options: options, bulkCopyTimeout: bulkCopyTimeout, batchSize: batchSize, - transaction: transaction); + transaction: transaction, + cancellationToken: cancellationToken); } // Create the clustered index sql = GetCreateTemporaryTableClusteredIndexSqlText(tempTableName, primaryOrIdentityField.AsEnumerable(), dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Delete the actual delete sql = GetBulkDeleteSqlText(tableName, @@ -1304,11 +1332,11 @@ await BulkInsertAsyncInternal(connection, primaryOrIdentityField.AsEnumerable(), hints, dbSetting); - result = connection.ExecuteNonQuery(sql, commandTimeout: bulkCopyTimeout, transaction: transaction); + result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction, cancellationToken: cancellationToken); // Drop the table after used sql = GetDropTemporaryTableSqlText(tempTableName, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Commit the transaction if (hasTransaction == false) @@ -1354,6 +1382,7 @@ await BulkInsertAsyncInternal(connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. internal static async Task BulkDeleteAsyncInternal(SqlConnection connection, string tableName, @@ -1365,7 +1394,8 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Variables var dbSetting = connection.GetDbSetting(); @@ -1376,7 +1406,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection if (transaction == null) { // Add the transaction if not present - transaction = (SqlTransaction)(await connection.EnsureOpenAsync()).BeginTransaction(); + transaction = (SqlTransaction)(await connection.EnsureOpenAsync(cancellationToken)).BeginTransaction(); } else { @@ -1399,7 +1429,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection try { // Get the DB Fields - var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true); + var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true, cancellationToken); // Variables needed var readerFields = Enumerable.Range(0, reader.FieldCount) @@ -1449,7 +1479,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection tempTableName, fields, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Set the options to KeepIdentity if needed if (options == null && identityDbField?.IsIdentity == true && @@ -1471,24 +1501,25 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection fields?.Any(field => string.Equals(field.Name, dbField.Name, StringComparison.OrdinalIgnoreCase)) == true); // Do the bulk insertion first - await BulkInsertAsyncInternal(connection, - tempTableName, - reader, - filteredDbFields, - mappings, - options, - null, + await BulkInsertAsyncInternal(connection: connection, + tableName: tempTableName, + reader: reader, + dbFields: filteredDbFields, + mappings: mappings, + options: options, + hints: null, bulkCopyTimeout, batchSize, - false, - false, - transaction); + isReturnIdentity: false, + usePhysicalPseudoTempTable: false, + transaction: transaction, + cancellationToken: cancellationToken); // Create the clustered index sql = GetCreateTemporaryTableClusteredIndexSqlText(tempTableName, qualifiers, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Delete the actual delete sql = GetBulkDeleteSqlText(tableName, @@ -1496,11 +1527,11 @@ await BulkInsertAsyncInternal(connection, qualifiers, hints, dbSetting); - result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction); + result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction, cancellationToken: cancellationToken); // Drop the table after used sql = GetDropTemporaryTableSqlText(tempTableName, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Commit the transaction if (hasTransaction == false) @@ -1547,6 +1578,7 @@ await BulkInsertAsyncInternal(connection, /// The size per batch to be used. /// The flags that signify whether to create a physical pseudo table. /// The transaction to be used. + /// The object to be used during the asynchronous operation. /// The number of rows affected by the execution. internal static async Task BulkDeleteAsyncInternal(SqlConnection connection, string tableName, @@ -1559,7 +1591,8 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection int? bulkCopyTimeout = null, int? batchSize = null, bool? usePhysicalPseudoTempTable = null, - SqlTransaction transaction = null) + SqlTransaction transaction = null, + CancellationToken cancellationToken = default) { // Variables var dbSetting = connection.GetDbSetting(); @@ -1570,7 +1603,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection if (transaction == null) { // Add the transaction if not present - transaction = (SqlTransaction)(await connection.EnsureOpenAsync()).BeginTransaction(); + transaction = (SqlTransaction)(await connection.EnsureOpenAsync(cancellationToken)).BeginTransaction(); } else { @@ -1593,7 +1626,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection try { // Get the DB Fields - var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true); + var dbFields = await DbFieldCache.GetAsync(connection, tableName, transaction, true, cancellationToken); // Variables needed var tableFields = Enumerable.Range(0, dataTable.Columns.Count) @@ -1643,7 +1676,7 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection tempTableName, fields, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Set the options to KeepIdentity if needed if (options == null && identityDbField?.IsIdentity == true && @@ -1665,25 +1698,26 @@ internal static async Task BulkDeleteAsyncInternal(SqlConnection connection fields?.Any(field => string.Equals(field.Name, dbField.Name, StringComparison.OrdinalIgnoreCase)) == true); // Do the bulk insertion first - await BulkInsertAsyncInternal(connection, - tempTableName, - dataTable, - rowState, - filteredDbFields, - mappings, - options, - null, + await BulkInsertAsyncInternal(connection: connection, + tableName: tempTableName, + dataTable: dataTable, + rowState: rowState, + dbFields: filteredDbFields, + mappings: mappings, + options: options, + hints: null, bulkCopyTimeout, batchSize, - false, - false, - transaction); + isReturnIdentity: false, + usePhysicalPseudoTempTable: false, + transaction: transaction, + cancellationToken: cancellationToken); // Create the clustered index sql = GetCreateTemporaryTableClusteredIndexSqlText(tempTableName, qualifiers, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Delete the actual delete sql = GetBulkDeleteSqlText(tableName, @@ -1691,11 +1725,11 @@ await BulkInsertAsyncInternal(connection, qualifiers, hints, dbSetting); - result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction); + result = await connection.ExecuteNonQueryAsync(sql, commandTimeout: bulkCopyTimeout, transaction: transaction, cancellationToken: cancellationToken); // Drop the table after used sql = GetDropTemporaryTableSqlText(tempTableName, dbSetting); - await connection.ExecuteNonQueryAsync(sql, transaction: transaction); + await connection.ExecuteNonQueryAsync(sql, transaction: transaction, cancellationToken: cancellationToken); // Commit the transaction if (hasTransaction == false)