Skip to content

Commit

Permalink
Merge pull request #1919 from preardon/issue/1918-Outbox-TXN-Issues
Browse files Browse the repository at this point in the history
Fix Outbox Transaction Issues
  • Loading branch information
preardon authored Dec 23, 2021
2 parents 60d09ab + d86baee commit e9d7d86
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Identity;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore.Diagnostics;

namespace Greetings.Adaptors.Data
{
public class AzureAdAuthenticationDbConnectionInterceptor : DbConnectionInterceptor
{
// See https://docs.microsoft.com/azure/active-directory/managed-identities-azure-resources/services-support-managed-identities#azure-sql
private static readonly string[] _azureSqlScopes = new[] { "https://database.windows.net//.default" };
private const int _cacheLifeTime = 5;

private static readonly TokenCredential _credential = new ChainedTokenCredential(
new ManagedIdentityCredential(),
new VisualStudioCredential());

private static AccessToken _token;
private static SemaphoreSlim _semaphoreToken = new SemaphoreSlim(1, 1);

public override InterceptionResult ConnectionOpening(
DbConnection connection,
ConnectionEventData eventData,
InterceptionResult result)
{
var sqlConnection = (SqlConnection)connection;
if (DoesConnectionNeedAccessToken(sqlConnection))
sqlConnection.AccessToken = GetAccessToken();

return base.ConnectionOpening(connection, eventData, result);
}

public override async ValueTask<InterceptionResult> ConnectionOpeningAsync(
DbConnection connection,
ConnectionEventData eventData,
InterceptionResult result,
CancellationToken cancellationToken = default)
{
var sqlConnection = (SqlConnection)connection;
if (DoesConnectionNeedAccessToken(sqlConnection))
sqlConnection.AccessToken = await GetAccessTokenAsync();

return await base.ConnectionOpeningAsync(connection, eventData, result, cancellationToken);
}

private static bool DoesConnectionNeedAccessToken(SqlConnection connection)
{
//
// Only try to get a token from AAD if
// - We connect to an Azure SQL instance; and
// - The connection doesn't specify a username.
//
var connectionStringBuilder = new SqlConnectionStringBuilder(connection.ConnectionString);

return connectionStringBuilder.DataSource.Contains("database.windows.net", StringComparison.OrdinalIgnoreCase) && string.IsNullOrEmpty(connectionStringBuilder.UserID);
}

private string GetAccessToken()
{
_semaphoreToken.Wait();
try
{
//If the Token has more than 5 minutes Validity
if (DateTime.UtcNow.AddMinutes(_cacheLifeTime) <= _token.ExpiresOn.UtcDateTime)
return _token.Token;

var tokenRequestContext = new TokenRequestContext(_azureSqlScopes);
var token = _credential.GetToken(tokenRequestContext, CancellationToken.None);

_token = token;

return token.Token;
}
finally
{
_semaphoreToken.Release();
}
}

private async Task<string> GetAccessTokenAsync()
{
await _semaphoreToken.WaitAsync();
try
{
//If the Token has more than 5 minutes Validity
if (DateTime.UtcNow.AddMinutes(_cacheLifeTime) <= _token.ExpiresOn.UtcDateTime)
return _token.Token;

var tokenRequestContext = new TokenRequestContext(_azureSqlScopes);
var token = await _credential.GetTokenAsync(tokenRequestContext, CancellationToken.None);

_token = token;

return token.Token;
}
finally
{
_semaphoreToken.Release();
}
}
}
}
1 change: 1 addition & 0 deletions samples/ASBTaskQueue/Greetings/Greetings.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.5.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.11" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.11" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.11" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public async Task<IActionResult> DepositMessage()
var greetingAsync = new GreetingAsyncEvent("Deposit Hello from the web");
var greeting = new GreetingEvent("Deposit Hello from the web");

await _commandProcessor.DepositPostAsync(greetingAsync);

_context.Greetings.Add(greeting);
_context.GreetingsAsync.Add(greetingAsync);
await _context.SaveChangesAsync();
Expand Down
3 changes: 2 additions & 1 deletion samples/ASBTaskQueue/GreetingsSender.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
builder.Services.AddDbContext<GreetingsDataContext>(o =>
{
o.UseSqlServer(dbConnString);
//o.AddInterceptors(new AzureAdAuthenticationDbConnectionInterceptor());
});

//Services
Expand Down Expand Up @@ -67,7 +68,7 @@
var services = serviceScope.ServiceProvider;
var dbContext = services.GetService<GreetingsDataContext>();

dbContext.Database.EnsureCreated();
//dbContext.Database.EnsureCreated();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
Expand All @@ -20,14 +21,16 @@ public MsSqlEntityFrameworkCoreConnectionProvider(T context)

public SqlConnection GetConnection()
{
//This line ensure that the connection has been initialised and that any required interceptors have been run before getting the connection
_context.Database.CanConnect();
return (SqlConnection)_context.Database.GetDbConnection();
}

public Task<SqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default(CancellationToken))
public async Task<SqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default(CancellationToken))
{
var tcs = new TaskCompletionSource<SqlConnection>();
tcs.SetResult(GetConnection());
return tcs.Task;
//This line ensure that the connection has been initialised and that any required interceptors have been run before getting the connection
await _context.Database.CanConnectAsync(cancellationToken);
return (SqlConnection)_context.Database.GetDbConnection();
}

public SqlTransaction GetTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public MySqlEntityFrameworkConnectionProvider(T context)
/// <returns>The Sqlite Connection that is in use</returns>
public MySqlConnection GetConnection()
{
//This line ensure that the connection has been initialised and that any required interceptors have been run before getting the connection
_context.Database.CanConnect();
return (MySqlConnection) _context.Database.GetDbConnection();
}

Expand All @@ -37,11 +39,11 @@ public MySqlConnection GetConnection()
/// </summary>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns></returns>
public Task<MySqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default(CancellationToken))
public async Task<MySqlConnection> GetConnectionAsync(CancellationToken cancellationToken = default(CancellationToken))
{
var tcs = new TaskCompletionSource<MySqlConnection>();
tcs.SetResult((MySqlConnection)_context.Database.GetDbConnection());
return tcs.Task;
//This line ensure that the connection has been initialised and that any required interceptors have been run before getting the connection
await _context.Database.CanConnectAsync(cancellationToken);
return (MySqlConnection)_context.Database.GetDbConnection();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public SqliteEntityFrameworkConnectionProvider(T context)
/// <returns>The Sqlite Connection that is in use</returns>
public SqliteConnection GetConnection()
{
//This line ensure that the connection has been initialised and that any required interceptors have been run before getting the connection
_context.Database.CanConnect();
return (SqliteConnection) _context.Database.GetDbConnection();
}

Expand All @@ -37,11 +39,11 @@ public SqliteConnection GetConnection()
/// </summary>
/// <param name="cancellationToken">A cancellation token</param>
/// <returns></returns>
public Task<SqliteConnection> GetConnectionAsync(CancellationToken cancellationToken = default(CancellationToken))
public async Task<SqliteConnection> GetConnectionAsync(CancellationToken cancellationToken = default(CancellationToken))
{
var tcs = new TaskCompletionSource<SqliteConnection>();
tcs.SetResult((SqliteConnection)_context.Database.GetDbConnection());
return tcs.Task;
//This line ensure that the connection has been initialised and that any required interceptors have been run before getting the connection
await _context.Database.CanConnectAsync(cancellationToken);
return (SqliteConnection)_context.Database.GetDbConnection();
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ private async Task<Guid> DepositPostAsync<T>(T request, IAmABoxTransactionConnec

var message = messageMapper.MapToMessage(request);

await _bus.AddToOutboxAsync(request, continueOnCapturedContext, cancellationToken, message, _boxTransactionConnectionProvider);
await _bus.AddToOutboxAsync(request, continueOnCapturedContext, cancellationToken, message, connectionProvider);

return message.Id;
}
Expand Down

0 comments on commit e9d7d86

Please sign in to comment.