Skip to content

Commit

Permalink
fix: Refactor MailSender to use a queue
Browse files Browse the repository at this point in the history
  • Loading branch information
hez2010 committed Apr 14, 2024
1 parent 1306325 commit b72f995
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 35 deletions.
7 changes: 5 additions & 2 deletions src/GZCTF/Extensions/DatabaseSinkExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class DatabaseSink : ILogEventSink, IDisposable
DateTimeOffset _lastFlushTime = DateTimeOffset.FromUnixTimeSeconds(0);
readonly CancellationTokenSource _tokenSource = new();
readonly ConcurrentQueue<LogModel> _logBuffer = [];
readonly AsyncManualResetEvent _resetEvent = new();

public DatabaseSink(IServiceProvider serviceProvider)
{
Expand All @@ -41,6 +42,7 @@ public void Emit(LogEvent logEvent)
return;

_logBuffer.Enqueue(ToLogModel(logEvent));
_resetEvent.Set();
}

static LogModel ToLogModel(LogEvent logEvent)
Expand Down Expand Up @@ -71,6 +73,9 @@ async Task WriteToDatabase(CancellationToken token = default)
{
while (!token.IsCancellationRequested)
{
await _resetEvent.WaitAsync(token);
_resetEvent.Reset();

while (_logBuffer.TryDequeue(out LogModel? logModel))
lockedLogBuffer.Add(logModel);

Expand All @@ -91,8 +96,6 @@ async Task WriteToDatabase(CancellationToken token = default)
_lastFlushTime = DateTimeOffset.Now;
}
}

await Task.Delay(TimeSpan.FromSeconds(1), token);
}
}
catch (TaskCanceledException) { }
Expand Down
135 changes: 102 additions & 33 deletions src/GZCTF/Services/MailSender.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text;
using System.Collections.Concurrent;
using System.Text;
using GZCTF.Models.Internal;
using GZCTF.Services.Interface;
using MailKit.Net.Smtp;
Expand All @@ -9,54 +10,68 @@

namespace GZCTF.Services;

public class MailSender(
IOptions<EmailConfig> options,
IOptionsSnapshot<GlobalConfig> globalConfig,
ILogger<MailSender> logger,
IStringLocalizer<Program> localizer) : IMailSender
public sealed class MailSender : IMailSender, IDisposable
{
readonly EmailConfig? _options = options.Value;
private readonly ConcurrentQueue<MailContent> _mailQueue = new();
private readonly EmailConfig? _options;
private readonly IOptionsSnapshot<GlobalConfig> _globalConfig;
private readonly ILogger<MailSender> _logger;
private readonly IStringLocalizer<Program> _localizer;
private readonly SmtpClient? _smtpClient;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly CancellationToken _cancellationToken;
private readonly AsyncManualResetEvent _resetEvent = new();
private bool _disposed;

public MailSender(
IOptions<EmailConfig> options,
IOptionsSnapshot<GlobalConfig> globalConfig,
ILogger<MailSender> logger,
IStringLocalizer<Program> localizer)
{
_globalConfig = globalConfig;
_logger = logger;
_localizer = localizer;
_options = options.Value;
_cancellationToken = _cancellationTokenSource.Token;

if (_options is { SendMailAddress: not null, Smtp.Host: not null, Smtp.Port: not null })
{
_smtpClient = new();
_smtpClient.AuthenticationMechanisms.Remove("XOAUTH2");
Task.Factory.StartNew(MailSenderWorker, _cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}

public async Task<bool> SendEmailAsync(string subject, string content, string to)
{
if (_options?.SendMailAddress is null ||
_options?.Smtp?.Host is null ||
_options?.Smtp?.Port is null)
return true;

var msg = new MimeMessage();
msg.From.Add(new MailboxAddress(_options.SendMailAddress, _options.SendMailAddress));
using var msg = new MimeMessage();
msg.From.Add(new MailboxAddress(_options!.SendMailAddress, _options.SendMailAddress));
msg.To.Add(new MailboxAddress(to, to));
msg.Subject = subject;
msg.Body = new TextPart(TextFormat.Html) { Text = content };

try
{
using var client = new SmtpClient();

await client.ConnectAsync(_options.Smtp.Host, _options.Smtp.Port.Value);
client.AuthenticationMechanisms.Remove("XOAUTH2");
await client.AuthenticateAsync(_options.UserName, _options.Password);
await client.SendAsync(msg);
await client.DisconnectAsync(true);
await _smtpClient!.SendAsync(msg, cancellationToken: _cancellationToken);

logger.SystemLog(Program.StaticLocalizer[nameof(Resources.Program.MailSender_SendMail), to],
_logger.SystemLog(Program.StaticLocalizer[nameof(Resources.Program.MailSender_SendMail), to],
TaskStatus.Success, LogLevel.Information);
return true;
}
catch (Exception e)
{
logger.LogError(e, Program.StaticLocalizer[nameof(Resources.Program.MailSender_MailSendFailed)]);
_logger.LogError(e, Program.StaticLocalizer[nameof(Resources.Program.MailSender_MailSendFailed)]);
return false;
}
}

public async Task SendUrlAsync(MailContent content)
{
var template = globalConfig.Value.EmailTemplate switch
var template = _globalConfig.Value.EmailTemplate switch
{
GlobalConfig.DefaultEmailTemplate => localizer[nameof(Resources.Program.MailSender_Template)],
_ => globalConfig.Value.EmailTemplate
GlobalConfig.DefaultEmailTemplate => _localizer[nameof(Resources.Program.MailSender_Template)],
_ => _globalConfig.Value.EmailTemplate
};

// TODO: use a string formatter library
Expand All @@ -69,14 +84,55 @@ public async Task SendUrlAsync(MailContent content)
.Replace("{userName}", content.UserName)
.Replace("{url}", content.Url)
.Replace("{nowtime}", content.Time)
.Replace("{platform}", $"{globalConfig.Value.Title}::CTF")
.Replace("{platform}", $"{_globalConfig.Value.Title}::CTF")
.ToString();

if (!await SendEmailAsync(content.Title, emailContent, content.Email))
logger.SystemLog(Program.StaticLocalizer[nameof(Resources.Program.MailSender_MailSendFailed)],
_logger.SystemLog(Program.StaticLocalizer[nameof(Resources.Program.MailSender_MailSendFailed)],
TaskStatus.Failed);
}

private async Task MailSenderWorker()
{
if (_smtpClient is null)
return;

while (!_cancellationToken.IsCancellationRequested)
{
await _resetEvent.WaitAsync(_cancellationToken);
_resetEvent.Reset();

try
{
if (!_smtpClient.IsConnected)
{
await _smtpClient.ConnectAsync(_options!.Smtp!.Host, _options.Smtp.Port!.Value, cancellationToken: _cancellationToken);
}

if (!_smtpClient.IsAuthenticated)
{
await _smtpClient.AuthenticateAsync(_options!.UserName, _options.Password, cancellationToken: _cancellationToken);
}

while (_mailQueue.TryDequeue(out var content))
{
await SendUrlAsync(content);
}
}
catch (Exception e)
{
// Failed to establish SMTP connection, clear the queue
_mailQueue.Clear();

_logger.LogError(e, Program.StaticLocalizer[nameof(Resources.Program.MailSender_MailSendFailed)]);
}
finally
{
await _smtpClient.DisconnectAsync(true, cancellationToken: _cancellationToken);
}
}
}

public bool SendConfirmEmailUrl(string? userName, string? email, string? confirmLink) =>
SendUrlIfPossible(userName, email, confirmLink, MailType.ConfirmEmail);

Expand All @@ -88,23 +144,36 @@ public bool SendResetPasswordUrl(string? userName, string? email, string? resetL

bool SendUrlIfPossible(string? userName, string? email, string? resetLink, MailType type)
{
if (_options?.SendMailAddress is null)
if (_smtpClient is null)
return false;

if (string.IsNullOrEmpty(userName) || string.IsNullOrEmpty(email) || string.IsNullOrEmpty(resetLink))
{
logger.SystemLog(Program.StaticLocalizer[nameof(Resources.Program.MailSender_InvalidRequest)],
_logger.SystemLog(Program.StaticLocalizer[nameof(Resources.Program.MailSender_InvalidRequest)],
TaskStatus.Failed);
return false;
}

var content = new MailContent(userName, email, resetLink, type, localizer);
var content = new MailContent(userName, email, resetLink, type, _localizer);

// do not await
Task _ = SendUrlAsync(content);
_mailQueue.Enqueue(content);
_resetEvent.Set();

return true;
}

~MailSender() => Dispose();

public void Dispose()
{
if (!_disposed)
{
_disposed = true;
_cancellationTokenSource.Cancel();
_smtpClient?.Dispose();
GC.SuppressFinalize(this);
}
}
}

/// <summary>
Expand Down
54 changes: 54 additions & 0 deletions src/GZCTF/Utils/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace GZCTF.Utils;

public sealed class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> _tcs = new();

public async Task WaitAsync(CancellationToken cancellationToken = default)
{
var tcs = _tcs;
var cancelTcs = new TaskCompletionSource<bool>();

cancellationToken.Register(
s => ((TaskCompletionSource<bool>)s!).TrySetCanceled(), cancelTcs);

await await Task.WhenAny(tcs.Task, cancelTcs.Task);
}

private async Task<bool> Delay(int milliseconds)
{
await Task.Delay(milliseconds);
return false;
}

public async Task<bool> WaitAsync(int milliseconds, CancellationToken cancellationToken = default)
{
var tcs = _tcs;
var cancelTcs = new TaskCompletionSource<bool>();

cancellationToken.Register(
s => ((TaskCompletionSource<bool>)s!).TrySetCanceled(), cancelTcs);

return await await Task.WhenAny(tcs.Task, cancelTcs.Task, Delay(milliseconds));
}

public void Set()
{
var tcs = _tcs;
Task.Factory.StartNew(s => ((TaskCompletionSource<bool>)s!).TrySetResult(true),
tcs, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
tcs.Task.Wait();
}

public void Reset()
{
var newTcs = new TaskCompletionSource<bool>();
while (true)
{
var tcs = _tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref _tcs, newTcs, tcs) == tcs)
return;
}
}
}

0 comments on commit b72f995

Please sign in to comment.