From a77507983bbbab1fd69a4b8d1cf0987ec210904a Mon Sep 17 00:00:00 2001 From: Ariana D Mihailescu <82962995+arianamihailescu@users.noreply.github.com> Date: Mon, 30 Dec 2024 13:26:51 -0500 Subject: [PATCH 1/2] LNK-3007: Testing/Integration Testing - select/store data based on facility --- .../Integration/KafkaConsumerManager.cs | 102 ++++++++++-------- .../Integration/KafkaConsumerService.cs | 63 +++++++++-- .../Models/Integration/Facility.cs | 14 +++ .../Endpoints/IntegrationTestingEndpoints.cs | 28 ++--- 4 files changed, 141 insertions(+), 66 deletions(-) create mode 100644 DotNet/LinkAdmin.BFF/Application/Models/Integration/Facility.cs diff --git a/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs b/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs index 60296d19..b28e1ce9 100644 --- a/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs +++ b/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs @@ -13,72 +13,86 @@ namespace LantanaGroup.Link.LinkAdmin.BFF.Application.Commands.Integration public class KafkaConsumerManager { - - private readonly List<(IConsumer, CancellationTokenSource)> _consumers; + + + private readonly List<(IConsumer, CancellationTokenSource)> _consumers; private readonly KafkaConnection _kafkaConnection; private readonly KafkaConsumerService _kafkaConsumerService; private readonly IOptions _cacheSettings; private readonly IServiceScopeFactory _serviceScopeFactory; + + private readonly static string errorTopic = "-Error"; + public static readonly string delimitator = ":"; + // construct a list of topics private List<(string, string)> kafkaTopics = new List<(string, string)> { ("ReportScheduledDyn", KafkaTopic.ReportScheduled.ToString()), - ("ReportScheduledDyn", KafkaTopic.ReportScheduled.ToString()+"-Error"), + ("ReportScheduledDyn", KafkaTopic.ReportScheduled.ToString() + errorTopic), ("CensusDyn", KafkaTopic.PatientIDsAcquired.ToString()), - ("CensusDyn", KafkaTopic.PatientIDsAcquired.ToString()+"-Error"), + ("CensusDyn", KafkaTopic.PatientIDsAcquired.ToString() + errorTopic), ("QueryDispatchDyn", KafkaTopic.PatientEvent.ToString()), - ("QueryDispatchDyn", KafkaTopic.PatientEvent.ToString()+"-Error"), + ("QueryDispatchDyn", KafkaTopic.PatientEvent.ToString() + errorTopic), ("DataAcquisitionDyn", KafkaTopic.DataAcquisitionRequested.ToString()), - ("DataAcquisitionDyn", KafkaTopic.DataAcquisitionRequested.ToString()+"-Error"), + ("DataAcquisitionDyn", KafkaTopic.DataAcquisitionRequested.ToString() + errorTopic), ("AcquiredDyn", KafkaTopic.ResourceAcquired.ToString()), - ("AcquiredDyn", KafkaTopic.ResourceAcquired.ToString()+"-Error"), + ("AcquiredDyn", KafkaTopic.ResourceAcquired.ToString() + errorTopic), ("NormalizationDyn", KafkaTopic.ResourceNormalized.ToString()), - ("NormalizationDyn", KafkaTopic.ResourceNormalized.ToString()+"-Error"), - ("ReportDyn", KafkaTopic.SubmitReport.ToString()), - ("ReportDyn", KafkaTopic.SubmitReport.ToString()+"-Error"), + ("NormalizationDyn", KafkaTopic.ResourceNormalized.ToString() + errorTopic), ("ResourceEvaluatedDyn", KafkaTopic.ResourceEvaluated.ToString()), - ("ResourceEvaluatedDyn", KafkaTopic.ResourceEvaluated.ToString()+"-Error"), + ("ResourceEvaluatedDyn", KafkaTopic.ResourceEvaluated.ToString() + errorTopic), + ("ReportDyn", KafkaTopic.SubmitReport.ToString()), + ("ReportDyn", KafkaTopic.SubmitReport.ToString() + errorTopic), }; + // Add constructor public KafkaConsumerManager(KafkaConsumerService kafkaConsumerService, IOptions cacheSettings, IServiceScopeFactory serviceScopeFactory, KafkaConnection kafkaConnection) { _kafkaConsumerService = kafkaConsumerService; _cacheSettings = cacheSettings ?? throw new ArgumentNullException(nameof(cacheSettings)); _serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory)); - _consumers = new List<(IConsumer, CancellationTokenSource)>(); + _consumers = new List<(IConsumer, CancellationTokenSource)>(); _kafkaConnection = kafkaConnection ?? throw new ArgumentNullException(nameof(_kafkaConnection)); } - public void CreateAllConsumers() + + private IServiceScope ClearRedisCache(string facility) { // clear Redis cache - using var scope = _serviceScopeFactory.CreateScope(); + var scope = _serviceScopeFactory.CreateScope(); var _cache = scope.ServiceProvider.GetRequiredService(); foreach (var topic in kafkaTopics) - { + { { - String key = topic.Item2; - _cache.Remove(key); + String redisKey = topic.Item2 + delimitator + facility; + _cache.Remove(redisKey); } } + return scope; + } + - // loop through the list of topics and create a consumer for each + public void CreateAllConsumers(string facility) + { + //clear Redis cache for that facility + ClearRedisCache(facility); + + // start consumers foreach (var topic in kafkaTopics) { - if (topic.Item1 != "") + if (topic.Item2 != string.Empty) { - CreateConsumer(topic.Item1, topic.Item2); + CreateConsumer(topic.Item1, topic.Item2, facility); } - } - } - public void CreateConsumer(string groupId, string topic) + + public void CreateConsumer(string groupId, string topic, string facility) { var cts = new CancellationTokenSource(); var config = new ConsumerConfig @@ -96,43 +110,47 @@ public void CreateConsumer(string groupId, string topic) config.SaslPassword = _kafkaConnection.SaslPassword; } - var consumer = new ConsumerBuilder(config).Build(); + var consumer = new ConsumerBuilder(config).Build(); _consumers.Add((consumer, cts)); - Task.Run(() => _kafkaConsumerService.StartConsumer(groupId, topic, consumer, cts.Token)); + Task.Run(() => _kafkaConsumerService.StartConsumer(groupId, topic, facility, consumer, cts.Token)); } - public async Task StopAllConsumers() - { - - foreach (var consumerTuple in _consumers) - { - consumerTuple.Item2.Cancel(); - } - _consumers.Clear(); - - } - - public Dictionary readAllConsumers() + public Dictionary readAllConsumers(string facility) { Dictionary correlationIds = new Dictionary(); using var scope = _serviceScopeFactory.CreateScope(); var _cache = scope.ServiceProvider.GetRequiredService(); - // loop through the list of topics and get the correlation id for each + // loop through the redis keys for that facility and get the correlation id for each foreach (var topic in kafkaTopics) { - if (topic.Item2 != "") + if (topic.Item2 != string.Empty) { - string key = topic.Item2; - - correlationIds.Add(key, _cache.GetString(key)); + string redisKey = topic.Item2 + delimitator + facility; + correlationIds.Add(topic.Item2, _cache.GetString(redisKey)); } } return correlationIds; } + + public async Task StopAllConsumers(string facility) + { + // stop consumers + foreach (var consumerTuple in _consumers) + { + consumerTuple.Item2.Cancel(); + } + _consumers.Clear(); + + //clear Redis cache for that facility + ClearRedisCache(facility); + + } } + + } diff --git a/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerService.cs b/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerService.cs index 2cc85d07..762f6d96 100644 --- a/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerService.cs +++ b/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerService.cs @@ -3,6 +3,8 @@ using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.Options; using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System.Text.RegularExpressions; namespace LantanaGroup.Link.LinkAdmin.BFF.Application.Commands.Integration @@ -13,6 +15,8 @@ public class KafkaConsumerService private readonly IServiceScopeFactory _serviceScopeFactory; private readonly ILogger _logger; + + public KafkaConsumerService( IOptions cacheSettings, IServiceScopeFactory serviceScopeFactory, ILogger logger) { _cacheSettings = cacheSettings ?? throw new ArgumentNullException(nameof(cacheSettings)); @@ -20,8 +24,13 @@ public KafkaConsumerService( IOptions cacheSettings, IServiceScop _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } - public void StartConsumer(string groupId, string topic, IConsumer consumer, CancellationToken cancellationToken) + public void StartConsumer(string groupId, string topic, string facility, IConsumer consumer, CancellationToken cancellationToken) { + + // get the Redis cache + using var scope = _serviceScopeFactory.CreateScope(); + var _cache = scope.ServiceProvider.GetRequiredService(); + using (consumer) { consumer.Subscribe(topic); @@ -36,14 +45,23 @@ public void StartConsumer(string groupId, string topic, IConsumer(); - // append the new correlation id to the existing list + string consumeResultFacility = this.extractFacility(consumeResult.Message.Key); + + if(facility != consumeResultFacility) + { + // _logger.LogInformation("Searched Facility ID {facility} does not match message facility {consumeResultFacility}. Skipping message.", facility, consumeResultFacility); + continue; + } + // read the list from Redis + + var redisKey = topic + KafkaConsumerManager.delimitator + facility; + string retrievedListJson = _cache.GetString(redisKey); + var retrievedList = new List(); - string retrievedListJson = _cache.GetString(topic); if (retrievedListJson != null) { retrievedList = JsonConvert.DeserializeObject>(retrievedListJson); } + // append the new correlation id to the existing list if (!retrievedList.Contains(correlationId)) { retrievedList.Add(correlationId); @@ -51,7 +69,10 @@ public void StartConsumer(string groupId, string topic, IConsumer Regex.IsMatch(p.Name, "facility", RegexOptions.IgnoreCase)); + + if (matchingProperty != null) + { + return matchingProperty.Value.ToString(); + } + else + { + return ""; + } + } + catch (JsonReaderException) + { + // If parsing fails, treat it as a plain string + return kafkaKey; + } + } } + } diff --git a/DotNet/LinkAdmin.BFF/Application/Models/Integration/Facility.cs b/DotNet/LinkAdmin.BFF/Application/Models/Integration/Facility.cs new file mode 100644 index 00000000..b4d0c05e --- /dev/null +++ b/DotNet/LinkAdmin.BFF/Application/Models/Integration/Facility.cs @@ -0,0 +1,14 @@ +using LantanaGroup.Link.LinkAdmin.BFF.Application.Interfaces.Models; + +namespace LantanaGroup.Link.LinkAdmin.BFF.Application.Models.Integration +{ + public class Facility + { + /// + /// Key for the patient event (FacilityId) + /// + /// TestFacility01 + public string FacilityId { get; set; } = string.Empty; + + } +} diff --git a/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs b/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs index 30fda886..7b61883d 100644 --- a/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs +++ b/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs @@ -90,8 +90,9 @@ public void RegisterEndpoints(WebApplication app) }); - integrationEndpoints.MapPost("/start-consumers", (Delegate)CreateConsumersRequested) + integrationEndpoints.MapPost("/start-consumers", CreateConsumersRequested) .Produces(StatusCodes.Status200OK) + .Produces(StatusCodes.Status400BadRequest) .Produces(StatusCodes.Status401Unauthorized) .ProducesProblem(StatusCodes.Status500InternalServerError) .WithOpenApi(x => new OpenApiOperation(x) @@ -100,7 +101,7 @@ public void RegisterEndpoints(WebApplication app) Description = "Integration Testing - Starts consumers" }); - integrationEndpoints.MapGet("/read-consumers", (Delegate)ReadConsumersRequested) + integrationEndpoints.MapPost("/read-consumers", ReadConsumersRequested) .Produces>(StatusCodes.Status200OK) .Produces(StatusCodes.Status401Unauthorized) .ProducesProblem(StatusCodes.Status500InternalServerError) @@ -111,7 +112,7 @@ public void RegisterEndpoints(WebApplication app) }); - integrationEndpoints.MapGet("/stop-consumers", (Delegate)DeleteConsumersRequested) + integrationEndpoints.MapPost("/stop-consumers", DeleteConsumersRequested) .Produces(StatusCodes.Status200OK) .Produces(StatusCodes.Status401Unauthorized) .ProducesProblem(StatusCodes.Status500InternalServerError) @@ -127,29 +128,20 @@ public void RegisterEndpoints(WebApplication app) } - public Task CreateConsumersRequested(HttpContext context) + public Task CreateConsumersRequested(HttpContext context, Facility facility) { - _kafkaConsumerManager.CreateAllConsumers(); + _kafkaConsumerManager.CreateAllConsumers(facility.FacilityId); return Task.CompletedTask; } - public async Task ReadConsumersRequested(HttpContext context) + public async Task ReadConsumersRequested(HttpContext context, Facility facility) { - Dictionary list = _kafkaConsumerManager.readAllConsumers(); - // construct response - //ConsumerResponse response = new ConsumerResponse(); - /*foreach (var item in list) - { - ConsumerResponseTopic resp = new ConsumerResponseTopic(); - resp.topic = item.Key; - resp.correlationId = item.Value; - response.list.Add(resp); - }*/ + Dictionary list = _kafkaConsumerManager.readAllConsumers(facility.FacilityId); return Results.Ok(list); } - public async Task DeleteConsumersRequested(HttpContext context) + public async Task DeleteConsumersRequested(HttpContext context, Facility facility) { - await _kafkaConsumerManager.StopAllConsumers(); + await _kafkaConsumerManager.StopAllConsumers(facility.FacilityId); return Task.CompletedTask; } From 8586ea24d20c45d0f2015b7178789430ee0bb6f8 Mon Sep 17 00:00:00 2001 From: Ariana D Mihailescu <82962995+arianamihailescu@users.noreply.github.com> Date: Mon, 30 Dec 2024 14:05:09 -0500 Subject: [PATCH 2/2] LNK-3007: Testing/Integration Testing - select/store data based on facility --- .../Application/Commands/Integration/KafkaConsumerManager.cs | 2 +- .../Presentation/Endpoints/IntegrationTestingEndpoints.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs b/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs index b28e1ce9..54d91778 100644 --- a/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs +++ b/DotNet/LinkAdmin.BFF/Application/Commands/Integration/KafkaConsumerManager.cs @@ -137,7 +137,7 @@ public Dictionary readAllConsumers(string facility) return correlationIds; } - public async Task StopAllConsumers(string facility) + public void StopAllConsumers(string facility) { // stop consumers foreach (var consumerTuple in _consumers) diff --git a/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs b/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs index 7b61883d..899fb3b2 100644 --- a/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs +++ b/DotNet/LinkAdmin.BFF/Presentation/Endpoints/IntegrationTestingEndpoints.cs @@ -139,9 +139,9 @@ public async Task ReadConsumersRequested(HttpContext context, Facility Dictionary list = _kafkaConsumerManager.readAllConsumers(facility.FacilityId); return Results.Ok(list); } - public async Task DeleteConsumersRequested(HttpContext context, Facility facility) + public Task DeleteConsumersRequested(HttpContext context, Facility facility) { - await _kafkaConsumerManager.StopAllConsumers(facility.FacilityId); + _kafkaConsumerManager.StopAllConsumers(facility.FacilityId); return Task.CompletedTask; }