Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LNK-3007: Testing/Integration Testing - select/store data based on fa… #594

Merged
merged 4 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,72 +13,86 @@ namespace LantanaGroup.Link.LinkAdmin.BFF.Application.Commands.Integration

public class KafkaConsumerManager
{

private readonly List<(IConsumer<Ignore, string>, CancellationTokenSource)> _consumers;


private readonly List<(IConsumer<string, string>, CancellationTokenSource)> _consumers;
private readonly KafkaConnection _kafkaConnection;
private readonly KafkaConsumerService _kafkaConsumerService;
private readonly IOptions<CacheSettings> _cacheSettings;
private readonly IServiceScopeFactory _serviceScopeFactory;

private readonly static string errorTopic = "-Error";
public static readonly string delimitator = ":";

// construct a list of topics
private List<(string, string)> kafkaTopics = new List<(string, string)>
{
("ReportScheduledDyn", KafkaTopic.ReportScheduled.ToString()),
("ReportScheduledDyn", KafkaTopic.ReportScheduled.ToString()+"-Error"),
("ReportScheduledDyn", KafkaTopic.ReportScheduled.ToString() + errorTopic),
("CensusDyn", KafkaTopic.PatientIDsAcquired.ToString()),
("CensusDyn", KafkaTopic.PatientIDsAcquired.ToString()+"-Error"),
("CensusDyn", KafkaTopic.PatientIDsAcquired.ToString() + errorTopic),
("QueryDispatchDyn", KafkaTopic.PatientEvent.ToString()),
("QueryDispatchDyn", KafkaTopic.PatientEvent.ToString()+"-Error"),
("QueryDispatchDyn", KafkaTopic.PatientEvent.ToString() + errorTopic),
("DataAcquisitionDyn", KafkaTopic.DataAcquisitionRequested.ToString()),
("DataAcquisitionDyn", KafkaTopic.DataAcquisitionRequested.ToString()+"-Error"),
("DataAcquisitionDyn", KafkaTopic.DataAcquisitionRequested.ToString() + errorTopic),
("AcquiredDyn", KafkaTopic.ResourceAcquired.ToString()),
("AcquiredDyn", KafkaTopic.ResourceAcquired.ToString()+"-Error"),
("AcquiredDyn", KafkaTopic.ResourceAcquired.ToString() + errorTopic),
("NormalizationDyn", KafkaTopic.ResourceNormalized.ToString()),
("NormalizationDyn", KafkaTopic.ResourceNormalized.ToString()+"-Error"),
("ReportDyn", KafkaTopic.SubmitReport.ToString()),
("ReportDyn", KafkaTopic.SubmitReport.ToString()+"-Error"),
("NormalizationDyn", KafkaTopic.ResourceNormalized.ToString() + errorTopic),
("ResourceEvaluatedDyn", KafkaTopic.ResourceEvaluated.ToString()),
("ResourceEvaluatedDyn", KafkaTopic.ResourceEvaluated.ToString()+"-Error"),
("ResourceEvaluatedDyn", KafkaTopic.ResourceEvaluated.ToString() + errorTopic),
("ReportDyn", KafkaTopic.SubmitReport.ToString()),
("ReportDyn", KafkaTopic.SubmitReport.ToString() + errorTopic),
};



// Add constructor
public KafkaConsumerManager(KafkaConsumerService kafkaConsumerService, IOptions<Shared.Application.Models.Configs.CacheSettings> cacheSettings, IServiceScopeFactory serviceScopeFactory, KafkaConnection kafkaConnection)
{
_kafkaConsumerService = kafkaConsumerService;
_cacheSettings = cacheSettings ?? throw new ArgumentNullException(nameof(cacheSettings));
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
_consumers = new List<(IConsumer<Ignore, string>, CancellationTokenSource)>();
_consumers = new List<(IConsumer<string, string>, CancellationTokenSource)>();
_kafkaConnection = kafkaConnection ?? throw new ArgumentNullException(nameof(_kafkaConnection));
}

public void CreateAllConsumers()

private IServiceScope ClearRedisCache(string facility)
{
// clear Redis cache
using var scope = _serviceScopeFactory.CreateScope();
var scope = _serviceScopeFactory.CreateScope();

var _cache = scope.ServiceProvider.GetRequiredService<IDistributedCache>();

foreach (var topic in kafkaTopics)
{
{
{
String key = topic.Item2;
_cache.Remove(key);
String redisKey = topic.Item2 + delimitator + facility;
_cache.Remove(redisKey);
}
}
return scope;
}


// loop through the list of topics and create a consumer for each
public void CreateAllConsumers(string facility)
{
//clear Redis cache for that facility
ClearRedisCache(facility);

// start consumers
foreach (var topic in kafkaTopics)
{
if (topic.Item1 != "")
if (topic.Item2 != string.Empty)
{
CreateConsumer(topic.Item1, topic.Item2);
CreateConsumer(topic.Item1, topic.Item2, facility);
}

}

}

public void CreateConsumer(string groupId, string topic)

public void CreateConsumer(string groupId, string topic, string facility)
{
var cts = new CancellationTokenSource();
var config = new ConsumerConfig
Expand All @@ -96,43 +110,47 @@ public void CreateConsumer(string groupId, string topic)
config.SaslPassword = _kafkaConnection.SaslPassword;
}

var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
var consumer = new ConsumerBuilder<string, string>(config).Build();

_consumers.Add((consumer, cts));

Task.Run(() => _kafkaConsumerService.StartConsumer(groupId, topic, consumer, cts.Token));
Task.Run(() => _kafkaConsumerService.StartConsumer(groupId, topic, facility, consumer, cts.Token));
}

public async Task StopAllConsumers()
{

foreach (var consumerTuple in _consumers)
{
consumerTuple.Item2.Cancel();
}

_consumers.Clear();

}

public Dictionary<string, string> readAllConsumers()
public Dictionary<string, string> readAllConsumers(string facility)
{
Dictionary<string, string> correlationIds = new Dictionary<string, string>();

using var scope = _serviceScopeFactory.CreateScope();
var _cache = scope.ServiceProvider.GetRequiredService<IDistributedCache>();
// loop through the list of topics and get the correlation id for each
// loop through the redis keys for that facility and get the correlation id for each
foreach (var topic in kafkaTopics)
{
if (topic.Item2 != "")
if (topic.Item2 != string.Empty)
{
string key = topic.Item2;

correlationIds.Add(key, _cache.GetString(key));
string redisKey = topic.Item2 + delimitator + facility;

correlationIds.Add(topic.Item2, _cache.GetString(redisKey));
}
}
return correlationIds;
}

public void StopAllConsumers(string facility)
{
// stop consumers
foreach (var consumerTuple in _consumers)
{
consumerTuple.Item2.Cancel();
}
_consumers.Clear();

//clear Redis cache for that facility
ClearRedisCache(facility);

}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Text.RegularExpressions;


namespace LantanaGroup.Link.LinkAdmin.BFF.Application.Commands.Integration
Expand All @@ -13,15 +15,22 @@ public class KafkaConsumerService
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<KafkaConsumerService> _logger;



public KafkaConsumerService( IOptions<CacheSettings> cacheSettings, IServiceScopeFactory serviceScopeFactory, ILogger<KafkaConsumerService> logger)
{
_cacheSettings = cacheSettings ?? throw new ArgumentNullException(nameof(cacheSettings));
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public void StartConsumer(string groupId, string topic, IConsumer<Ignore, string> consumer, CancellationToken cancellationToken)
public void StartConsumer(string groupId, string topic, string facility, IConsumer<string, string> consumer, CancellationToken cancellationToken)
{

// get the Redis cache
using var scope = _serviceScopeFactory.CreateScope();
var _cache = scope.ServiceProvider.GetRequiredService<IDistributedCache>();

using (consumer)
{
consumer.Subscribe(topic);
Expand All @@ -36,22 +45,34 @@ public void StartConsumer(string groupId, string topic, IConsumer<Ignore, string
if (consumeResult.Message.Headers.TryGetLastBytes("X-Correlation-Id", out var headerValue))
{
correlationId = System.Text.Encoding.UTF8.GetString(headerValue);
using var scope = _serviceScopeFactory.CreateScope();
var _cache = scope.ServiceProvider.GetRequiredService<IDistributedCache>();
// append the new correlation id to the existing list
string consumeResultFacility = this.extractFacility(consumeResult.Message.Key);

if(facility != consumeResultFacility)
{
// _logger.LogInformation("Searched Facility ID {facility} does not match message facility {consumeResultFacility}. Skipping message.", facility, consumeResultFacility);
continue;
}
// read the list from Redis

var redisKey = topic + KafkaConsumerManager.delimitator + facility;
string retrievedListJson = _cache.GetString(redisKey);

var retrievedList = new List<string>();
string retrievedListJson = _cache.GetString(topic);
if (retrievedListJson != null) {
retrievedList = JsonConvert.DeserializeObject<List<string>>(retrievedListJson);
}
// append the new correlation id to the existing list
if (!retrievedList.Contains(correlationId))
{
retrievedList.Add(correlationId);
}

string serializedList = JsonConvert.SerializeObject(retrievedList);

_cache.SetString(topic, serializedList);
// store the list back in Redis

_cache.SetString(redisKey, serializedList);

}
_logger.LogInformation("Consumed message '{MessageValue}' from topic {Topic}, partition {Partition}, offset {Offset}, correlation {CorrelationId}",consumeResult.Message.Value, consumeResult.Topic, consumeResult.Partition, consumeResult.Offset, correlationId);
}
Expand All @@ -71,5 +92,35 @@ public void StartConsumer(string groupId, string topic, IConsumer<Ignore, string
}
}
}

public string extractFacility(string kafkaKey)
{
if (string.IsNullOrEmpty(kafkaKey))
{
return "";
}

// Try to parse the key as JSON
try
{
var jsonObject = JObject.Parse(kafkaKey);
var matchingProperty = jsonObject.Properties().FirstOrDefault(p => Regex.IsMatch(p.Name, "facility", RegexOptions.IgnoreCase));

if (matchingProperty != null)
{
return matchingProperty.Value.ToString();
}
else
{
return "";
}
}
catch (JsonReaderException)
{
// If parsing fails, treat it as a plain string
return kafkaKey;
}
}
}

}
14 changes: 14 additions & 0 deletions DotNet/LinkAdmin.BFF/Application/Models/Integration/Facility.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using LantanaGroup.Link.LinkAdmin.BFF.Application.Interfaces.Models;

namespace LantanaGroup.Link.LinkAdmin.BFF.Application.Models.Integration
{
public class Facility
{
/// <summary>
/// Key for the patient event (FacilityId)
/// </summary>
/// <example>TestFacility01</example>
public string FacilityId { get; set; } = string.Empty;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ public void RegisterEndpoints(WebApplication app)
});


integrationEndpoints.MapPost("/start-consumers", (Delegate)CreateConsumersRequested)
integrationEndpoints.MapPost("/start-consumers", CreateConsumersRequested)
.Produces<EventProducerResponse>(StatusCodes.Status200OK)
.Produces<ValidationFailureResponse>(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status401Unauthorized)
.ProducesProblem(StatusCodes.Status500InternalServerError)
.WithOpenApi(x => new OpenApiOperation(x)
Expand All @@ -100,7 +101,7 @@ public void RegisterEndpoints(WebApplication app)
Description = "Integration Testing - Starts consumers"
});

integrationEndpoints.MapGet("/read-consumers", (Delegate)ReadConsumersRequested)
integrationEndpoints.MapPost("/read-consumers", ReadConsumersRequested)
.Produces<Dictionary<string, string>>(StatusCodes.Status200OK)
.Produces(StatusCodes.Status401Unauthorized)
.ProducesProblem(StatusCodes.Status500InternalServerError)
Expand All @@ -111,7 +112,7 @@ public void RegisterEndpoints(WebApplication app)
});


integrationEndpoints.MapGet("/stop-consumers", (Delegate)DeleteConsumersRequested)
integrationEndpoints.MapPost("/stop-consumers", DeleteConsumersRequested)
.Produces<object>(StatusCodes.Status200OK)
.Produces(StatusCodes.Status401Unauthorized)
.ProducesProblem(StatusCodes.Status500InternalServerError)
Expand All @@ -127,29 +128,20 @@ public void RegisterEndpoints(WebApplication app)

}

public Task CreateConsumersRequested(HttpContext context)
public Task CreateConsumersRequested(HttpContext context, Facility facility)
{
_kafkaConsumerManager.CreateAllConsumers();
_kafkaConsumerManager.CreateAllConsumers(facility.FacilityId);
return Task.CompletedTask;
}

public async Task<IResult> ReadConsumersRequested(HttpContext context)
public async Task<IResult> ReadConsumersRequested(HttpContext context, Facility facility)
{
Dictionary<string, string> list = _kafkaConsumerManager.readAllConsumers();
// construct response
//ConsumerResponse response = new ConsumerResponse();
/*foreach (var item in list)
{
ConsumerResponseTopic resp = new ConsumerResponseTopic();
resp.topic = item.Key;
resp.correlationId = item.Value;
response.list.Add(resp);
}*/
Dictionary<string, string> list = _kafkaConsumerManager.readAllConsumers(facility.FacilityId);
return Results.Ok(list);
}
public async Task<Task> DeleteConsumersRequested(HttpContext context)
public Task DeleteConsumersRequested(HttpContext context, Facility facility)
{
await _kafkaConsumerManager.StopAllConsumers();
_kafkaConsumerManager.StopAllConsumers(facility.FacilityId);
return Task.CompletedTask;
}

Expand Down
Loading