Skip to content

Commit

Permalink
feat: non-blocking cache update
Browse files Browse the repository at this point in the history
  • Loading branch information
GZTimeWalker committed May 4, 2023
1 parent 2950cbb commit 9ffa58b
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/GZCTF.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ Global
SolutionGuid = {643F6AF8-C22F-4E8E-9AB2-504B7C560CD6}
EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
version = 0.13.4
version = 0.14.0
EndGlobalSection
EndGlobal
6 changes: 3 additions & 3 deletions src/GZCTF/Controllers/EditController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public async Task<IActionResult> UpdateGame([FromRoute] int id, [FromBody] GameI
game.Update(model);
await gameRepository.SaveAsync(token);
gameRepository.FlushGameInfoCache();
gameRepository.FlushScoreboardCache(game.Id);
await gameRepository.FlushScoreboardCache(game.Id, token);

return Ok(GameInfoModel.FromGame(game));
}
Expand Down Expand Up @@ -534,7 +534,7 @@ await gameNoticeRepository.AddNotice(new()
}

// always flush scoreboard
gameRepository.FlushScoreboardCache(game.Id);
await gameRepository.FlushScoreboardCache(game.Id, token);

return Ok(ChallengeEditDetailModel.FromChallenge(res));
}
Expand Down Expand Up @@ -657,7 +657,7 @@ public async Task<IActionResult> RemoveGameChallenge([FromRoute] int id, [FromRo
await challengeRepository.RemoveChallenge(res, token);

// always flush scoreboard
gameRepository.FlushScoreboardCache(game.Id);
await gameRepository.FlushScoreboardCache(game.Id, token);

return Ok();
}
Expand Down
2 changes: 1 addition & 1 deletion src/GZCTF/Controllers/GameController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ public async Task<IActionResult> CreateContainer([FromRoute] int id, [FromRoute]

return await instanceRepository.CreateContainer(instance, context.Participation!.Team, context.User!, context.Game!.ContainerCountLimit, token) switch
{
null or (TaskStatus.Fail, null) => BadRequest(new RequestResponse("题目创建容器失败")),
null or (TaskStatus.Failed, null) => BadRequest(new RequestResponse("题目创建容器失败")),
(TaskStatus.Denied, null) => BadRequest(new RequestResponse($"队伍容器数目不能超过 {context.Game.ContainerCountLimit}")),
(TaskStatus.Success, var x) => Ok(ContainerInfoModel.FromContainer(x!)),
_ => throw new NotImplementedException(),
Expand Down
2 changes: 2 additions & 0 deletions src/GZCTF/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@
builder.Services.AddScoped<IConfigService, ConfigService>();

builder.Services.AddChannel<Submission>();
builder.Services.AddChannel<CacheRequest>();
builder.Services.AddHostedService<FlagChecker>();
builder.Services.AddHostedService<CacheMaker>();
builder.Services.AddHostedService<ContainerChecker>();

#endregion Services and Repositories
Expand Down
2 changes: 1 addition & 1 deletion src/GZCTF/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
}
}
}
}
}
46 changes: 40 additions & 6 deletions src/GZCTF/Repositories/GameRepository.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System.Text;
using System.Threading.Channels;
using CTFServer.Models.Request.Game;
using CTFServer.Models.Request.Info;
using CTFServer.Repositories.Interface;
using CTFServer.Services;
using CTFServer.Utils;
using IdentityModel.OidcClient;
using MemoryPack;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Caching.Distributed;

Expand All @@ -13,14 +16,17 @@ public class GameRepository : RepositoryBase, IGameRepository
private readonly IDistributedCache cache;
private readonly byte[]? xorkey;
private readonly ILogger<GameRepository> logger;
private readonly ChannelWriter<CacheRequest> cacheRequestChannelWriter;

public GameRepository(IDistributedCache _cache,
IConfiguration _configuration,
ILogger<GameRepository> _logger,
ChannelWriter<CacheRequest> _cacheRequestChannelWriter,
AppDbContext _context) : base(_context)
{
cache = _cache;
logger = _logger;
cacheRequestChannelWriter = _cacheRequestChannelWriter;
var xorkeyStr = _configuration["XorKey"];
xorkey = string.IsNullOrEmpty(xorkeyStr) ? null : Encoding.UTF8.GetBytes(xorkeyStr);
}
Expand Down Expand Up @@ -57,11 +63,10 @@ public async Task<BasicGameInfoModel[]> GetBasicGameInfo(int count = 10, int ski
public Task<ScoreboardModel> GetScoreboard(Game game, CancellationToken token = default)
=> cache.GetOrCreateAsync(logger, CacheKey.ScoreBoard(game.Id), entry =>
{
entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(12);
entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(14);
return GenScoreboard(game, token);
}, token);


public async Task DeleteGame(Game game, CancellationToken token = default)
{
context.Remove(game);
Expand All @@ -77,15 +82,15 @@ public Task<Game[]> GetGames(int count, int skip, CancellationToken token)
public void FlushGameInfoCache()
=> cache.Remove(CacheKey.BasicGameInfo);

public void FlushScoreboardCache(int gameId)
=> cache.Remove(CacheKey.ScoreBoard(gameId));
public async Task FlushScoreboardCache(int gameId, CancellationToken token)
=> await cacheRequestChannelWriter.WriteAsync(ScoreboardCacheHandler.MakeCacheRequest(gameId), token);

#region Generate Scoreboard

private record Data(Instance Instance, Submission? Submission);

// By xfoxfu & GZTimeWalker @ 2022/04/03
private async Task<ScoreboardModel> GenScoreboard(Game game, CancellationToken token = default)
public async Task<ScoreboardModel> GenScoreboard(Game game, CancellationToken token = default)
{
var data = await FetchData(game, token);
var bloods = GenBloods(data);
Expand Down Expand Up @@ -280,3 +285,32 @@ private static IEnumerable<TimeLine> GenTimeLine(IEnumerable<ChallengeItem> item

#endregion Generate Scoreboard
}

public class ScoreboardCacheHandler : ICacheRequestHandler
{
public static CacheRequest MakeCacheRequest(int id)
=> new(Utils.CacheKey.ScoreBoardBase, new() { AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(14) }, id.ToString());

public string? CacheKey(CacheRequest request)
{
if (request.Params.Length != 1)
return null;

return Utils.CacheKey.ScoreBoard(request.Params[0]);
}

public async Task<byte[]> Handler(AsyncServiceScope scope, CacheRequest request, CancellationToken token = default)
{
if (!int.TryParse(request.Params[0], out int id))
return Array.Empty<byte>();

var gameRepository = scope.ServiceProvider.GetRequiredService<IGameRepository>();
var game = await gameRepository.GetGameById(id, token);

if (game is null)
return Array.Empty<byte>();

var scoreboard = await gameRepository.GenScoreboard(game, token);
return MemoryPackSerializer.Serialize(scoreboard);
}
}
12 changes: 6 additions & 6 deletions src/GZCTF/Repositories/InstanceRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public InstanceRepository(AppDbContext _context,

if (flags.Count == 0)
{
logger.SystemLog($"题目 {challenge.Title}#{challenge.Id} 请求分配的动态附件数量不足", TaskStatus.Fail, LogLevel.Warning);
logger.SystemLog($"题目 {challenge.Title}#{challenge.Id} 请求分配的动态附件数量不足", TaskStatus.Failed, LogLevel.Warning);
return null;
}

Expand All @@ -86,7 +86,7 @@ public InstanceRepository(AppDbContext _context,
catch
{
retry++;
logger.SystemLog($"题目 {challenge.Title}#{challenge.Id} 分配的动态附件保存失败,重试中:{retry} 次", TaskStatus.Fail, LogLevel.Warning);
logger.SystemLog($"题目 {challenge.Title}#{challenge.Id} 分配的动态附件保存失败,重试中:{retry} 次", TaskStatus.Failed, LogLevel.Warning);
if (retry >= 3)
return null;
await Task.Delay(100, token);
Expand Down Expand Up @@ -123,7 +123,7 @@ public async Task<bool> DestroyContainer(Container container, CancellationToken
}
catch (Exception ex)
{
logger.SystemLog($"销毁容器 [{container.ContainerId[..12]}] ({container.Image.Split("/").LastOrDefault()}): {ex.Message}", TaskStatus.Fail, LogLevel.Warning);
logger.SystemLog($"销毁容器 [{container.ContainerId[..12]}] ({container.Image.Split("/").LastOrDefault()}): {ex.Message}", TaskStatus.Failed, LogLevel.Warning);
return false;
}
}
Expand All @@ -133,7 +133,7 @@ public async Task<TaskResult<Container>> CreateContainer(Instance instance, Team
if (string.IsNullOrEmpty(instance.Challenge.ContainerImage) || instance.Challenge.ContainerExposePort is null)
{
logger.SystemLog($"无法为题目 {instance.Challenge.Title} 启动容器实例", TaskStatus.Denied, LogLevel.Warning);
return new TaskResult<Container>(TaskStatus.Fail);
return new TaskResult<Container>(TaskStatus.Failed);
}

if (await context.Instances.CountAsync(i => i.Participation == instance.Participation
Expand All @@ -159,8 +159,8 @@ public async Task<TaskResult<Container>> CreateContainer(Instance instance, Team

if (container is null)
{
logger.SystemLog($"为题目 {instance.Challenge.Title} 启动容器实例失败", TaskStatus.Fail, LogLevel.Warning);
return new TaskResult<Container>(TaskStatus.Fail);
logger.SystemLog($"为题目 {instance.Challenge.Title} 启动容器实例失败", TaskStatus.Failed, LogLevel.Warning);
return new TaskResult<Container>(TaskStatus.Failed);
}

instance.Container = container;
Expand Down
10 changes: 9 additions & 1 deletion src/GZCTF/Repositories/Interface/IGameRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,15 @@ public interface IGameRepository : IRepository
/// 刷新排行榜
/// </summary>
/// <param name="gameId">比赛Id</param>
public void FlushScoreboardCache(int gameId);
/// <param name="token"></param>
public Task FlushScoreboardCache(int gameId, CancellationToken token);

/// <summary>
/// 生成排行榜
/// </summary>
/// <param name="game">比赛对象</param>
/// <param name="token"></param>
public Task<ScoreboardModel> GenScoreboard(Game game, CancellationToken token = default);

/// <summary>
/// 刷新比赛信息缓存
Expand Down
2 changes: 1 addition & 1 deletion src/GZCTF/Repositories/ParticipationRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public async Task UpdateParticipationStatus(Participation part, ParticipationSta
// will call SaveAsync
if (await EnsureInstances(part, part.Game, token))
// flush scoreboard when instances are updated
gameRepository.FlushScoreboardCache(part.Game.Id);
await gameRepository.FlushScoreboardCache(part.Game.Id, token);
}
// team will unlock automatically when request occur
else
Expand Down
158 changes: 158 additions & 0 deletions src/GZCTF/Services/CacheMaker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
using System.Threading.Channels;
using CTFServer.Repositories;
using CTFServer.Utils;
using Microsoft.Extensions.Caching.Distributed;

namespace CTFServer.Services;

/// <summary>
/// 缓存更新请求
/// </summary>
public class CacheRequest
{
public string Key { get; set; } = string.Empty;
public string[] Params { get; set; } = Array.Empty<string>();
public DistributedCacheEntryOptions? Options { get; set; }

public CacheRequest(string key, DistributedCacheEntryOptions? options = null, params string[] _params)
{
Key = key;
Params = _params;
Options = options;
}
}

/// <summary>
/// 缓存请求处理接口
/// </summary>
public interface ICacheRequestHandler
{
public string? CacheKey(CacheRequest request);
public Task<byte[]> Handler(AsyncServiceScope scope, CacheRequest request, CancellationToken token = default);
}

public class CacheMaker : IHostedService
{
private readonly ILogger<CacheMaker> logger;
private readonly IDistributedCache cache;
private readonly ChannelReader<CacheRequest> channelReader;
private readonly IServiceScopeFactory serviceScopeFactory;
private CancellationTokenSource TokenSource { get; set; } = new CancellationTokenSource();
private Dictionary<string, ICacheRequestHandler> cacheHandlers = new();

public CacheMaker(
ILogger<CacheMaker> _logger,
IDistributedCache _cache,
ChannelReader<CacheRequest> _channelReader,
IServiceScopeFactory _serviceScopeFactory)
{
logger = _logger;
cache = _cache;
channelReader = _channelReader;
serviceScopeFactory = _serviceScopeFactory;
}

public void AddCacheRequestHandler<T>(string key) where T : ICacheRequestHandler, new()
=> cacheHandlers.Add(key, new T());

private async Task Maker(CancellationToken token = default)
{
logger.SystemLog($"缓存更新线程已启动", TaskStatus.Pending, LogLevel.Debug);

try
{
await foreach (var item in channelReader.ReadAllAsync(token))
{
if (!cacheHandlers.ContainsKey(item.Key))
{
logger.SystemLog($"缓存更新线程未找到匹配的请求:{item.Key}", TaskStatus.NotFound, LogLevel.Warning);
continue;
}

var handler = cacheHandlers[item.Key];
var key = handler.CacheKey(item);

if (key is null)
{
logger.SystemLog($"无效的缓存更新请求:{item.Key}", TaskStatus.NotFound, LogLevel.Warning);
continue;
}

var updateLock = $"_CacheUpdateLock_{key}";

logger.SystemLog($"缓存更新线程开始处理更新请求:{key}", TaskStatus.Pending, LogLevel.Debug);

if (await cache.GetAsync(updateLock, token) is not null)
{
// only one GZCTF instance will never encounter this problem
logger.SystemLog($"缓存更新线程已锁定:{key}", TaskStatus.Pending, LogLevel.Debug);
continue;
}

await using var scope = serviceScopeFactory.CreateAsyncScope();

try
{
await cache.SetAsync(updateLock, Array.Empty<byte>(), new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(60)
}, token);

var bytes = await handler.Handler(scope, item, token);

if (bytes is not null && bytes.Length > 0)
{
await cache.SetAsync(key, bytes, item.Options ?? new DistributedCacheEntryOptions(), token);
logger.SystemLog($"缓存已更新:{key} @ {bytes.Length} bytes", TaskStatus.Success, LogLevel.Debug);
}
else
{
logger.SystemLog($"缓存生成失败:{key}", TaskStatus.Failed, LogLevel.Warning);
}
}
catch (Exception e)
{
logger.SystemLog($"缓存更新线程更新失败:{key} @ {e.Message}", TaskStatus.Failed, LogLevel.Error);
}
finally
{
await cache.RemoveAsync(updateLock, token);
}

token.ThrowIfCancellationRequested();
}
}
catch (OperationCanceledException)
{
logger.SystemLog($"任务取消,缓存更新线程将退出", TaskStatus.Exit, LogLevel.Debug);
}
finally
{
logger.SystemLog($"缓存更新线程已退出", TaskStatus.Exit, LogLevel.Debug);
}
}

public Task StartAsync(CancellationToken token)
{
TokenSource = new CancellationTokenSource();

#region Add Handlers

AddCacheRequestHandler<ScoreboardCacheHandler>(CacheKey.ScoreBoardBase);

#endregion

_ = Maker(TokenSource.Token);

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken token)
{
TokenSource.Cancel();

logger.SystemLog("缓存更新已停用", TaskStatus.Success, LogLevel.Debug);

return Task.CompletedTask;
}
}
Loading

0 comments on commit 9ffa58b

Please sign in to comment.