diff --git a/DotNet/DataAcquisition/Application/Models/Kafka/ResourceAcquired.cs b/DotNet/DataAcquisition/Application/Models/Kafka/ResourceAcquired.cs index a1088a589..76adecf2e 100644 --- a/DotNet/DataAcquisition/Application/Models/Kafka/ResourceAcquired.cs +++ b/DotNet/DataAcquisition/Application/Models/Kafka/ResourceAcquired.cs @@ -4,6 +4,7 @@ namespace LantanaGroup.Link.DataAcquisition.Application.Models.Kafka; public class ResourceAcquired { + public bool AcquisitionComplete { get; set; } = false; public string PatientId { get; set; } public string QueryType { get; set; } public Resource Resource { get; set; } diff --git a/DotNet/DataAcquisition/Application/Services/PatientDataService.cs b/DotNet/DataAcquisition/Application/Services/PatientDataService.cs index ddfee60b7..bcbdd13e8 100644 --- a/DotNet/DataAcquisition/Application/Services/PatientDataService.cs +++ b/DotNet/DataAcquisition/Application/Services/PatientDataService.cs @@ -199,6 +199,9 @@ await _queryListProcessor.Process( } catch (Exception ex) { + //produce tailing message + await ProduceTailingMessage(request.FacilityId, request.CorrelationId, patientId, dataAcqRequested.QueryType, dataAcqRequested.ScheduledReports, cancellationToken); + var message = $"Error retrieving data from EHR for facility: {request.FacilityId}\n{ex.Message}\n{ex.InnerException}"; _logger.LogError(message); @@ -206,6 +209,30 @@ await _queryListProcessor.Process( } } } + + //produce tailing message to indicate acquisition is complete + await ProduceTailingMessage(request.FacilityId, request.CorrelationId, patientId, dataAcqRequested.QueryType, dataAcqRequested.ScheduledReports, cancellationToken); + } + + private async Task ProduceTailingMessage(string facilityId, string correlationId, string patientId, string queryType, List scheduledReports, CancellationToken cancellationToken) + { + await _kafkaProducer.ProduceAsync( + KafkaTopic.ResourceAcquired.ToString(), + new Message + { + Key = facilityId, + Headers = new Headers + { + new Header(DataAcquisitionConstants.HeaderNames.CorrelationId, Encoding.UTF8.GetBytes(correlationId)) + }, + Value = new ResourceAcquired + { + AcquisitionComplete = true, + PatientId = patientId, + QueryType = queryType, + ScheduledReports = scheduledReports + } + }, cancellationToken); } private static string TEMPORARYPatientIdPart(string fullPatientUrl) diff --git a/DotNet/Normalization/Application/Models/Messages/ResourceAcquiredMessage.cs b/DotNet/Normalization/Application/Models/Messages/ResourceAcquiredMessage.cs index f642fc0da..a616a664b 100644 --- a/DotNet/Normalization/Application/Models/Messages/ResourceAcquiredMessage.cs +++ b/DotNet/Normalization/Application/Models/Messages/ResourceAcquiredMessage.cs @@ -4,6 +4,7 @@ namespace LantanaGroup.Link.Normalization.Application.Models.Messages; public class ResourceAcquiredMessage { + public bool AcquisitionComplete { get; set; } = false; public string PatientId { get; set; } public string QueryType { get; set; } public object Resource { get; set; } diff --git a/DotNet/Normalization/Application/Models/Messages/ResourceNormalizedMessage.cs b/DotNet/Normalization/Application/Models/Messages/ResourceNormalizedMessage.cs index 6dfbc1f09..9a9e7f6cc 100644 --- a/DotNet/Normalization/Application/Models/Messages/ResourceNormalizedMessage.cs +++ b/DotNet/Normalization/Application/Models/Messages/ResourceNormalizedMessage.cs @@ -5,6 +5,7 @@ namespace LantanaGroup.Link.Normalization.Application.Models.Messages; public class ResourceNormalizedMessage { + public bool AcquisitionComplete { get; set; } = false; public string PatientId { get; set; } public string QueryType { get; set; } public object Resource { get; set; } diff --git a/DotNet/Normalization/Listeners/ResourceAcquiredListener.cs b/DotNet/Normalization/Listeners/ResourceAcquiredListener.cs index 263a61d74..33c8b8fd0 100644 --- a/DotNet/Normalization/Listeners/ResourceAcquiredListener.cs +++ b/DotNet/Normalization/Listeners/ResourceAcquiredListener.cs @@ -104,7 +104,13 @@ await kafkaConsumer.ConsumeWithInstrumentation(async (result, CancellationToken) throw new DeadLetterException("Message Key (FacilityId) is null or empty."); } - if (message.Value == null || message.Value.Resource == null || string.IsNullOrWhiteSpace(message.Value.QueryType) || message.Value.ScheduledReports == null) + if ( + message.Message.Value == null + || ((message.Message.Value.Resource == null + || string.IsNullOrWhiteSpace(message.Message.Value.QueryType) + || message.Message.Value.ScheduledReports == null) + && !message.Message.Value.AcquisitionComplete) + ) { throw new DeadLetterException("Bad message with one of the followign reasons: \n* Null Message \n* Null Resource \n* No QueryType \n* No Scheduled Reports. Skipping message."); } @@ -119,6 +125,30 @@ await kafkaConsumer.ConsumeWithInstrumentation(async (result, CancellationToken) throw new DeadLetterException("Failed to extract FacilityId and CorrelationId from message.", ex); } + if (message.Message.Value.AcquisitionComplete && message.Message.Value.Resource == null) + { + _logger.LogInformation("Acquisition Complete tail message received. Producing message for measure eval."); + var headers = new Headers + { + new Header(NormalizationConstants.HeaderNames.CorrelationId, Encoding.UTF8.GetBytes(messageMetaData.correlationId)) + }; + var resourceNormalizedMessage = new ResourceNormalizedMessage + { + AcquisitionComplete = message.Message.Value.AcquisitionComplete, + PatientId = message.Message.Value.PatientId ?? "", + QueryType = message.Message.Value.QueryType, + ScheduledReports = message.Message.Value.ScheduledReports + }; + Message produceMessage = new Message + { + Key = messageMetaData.facilityId, + Headers = headers, + Value = resourceNormalizedMessage + }; + await kafkaProducer.ProduceAsync(KafkaTopic.ResourceNormalized.ToString(), produceMessage); + return; + } + NormalizationConfig? config = null; try { @@ -277,10 +307,10 @@ await _auditService.TriggerAuditEvent(new TriggerAuditEventCommand }; var resourceNormalizedMessage = new ResourceNormalizedMessage { - PatientId = message.Value.PatientId ?? "", + AcquisitionComplete = message.Message.Value.AcquisitionComplete, + PatientId = message.Message.Value.PatientId ?? "", Resource = serializedResource, - - QueryType = message.Value.QueryType, + QueryType = message.Message.Value.QueryType, ScheduledReports = message.Message.Value.ScheduledReports }; Message produceMessage = new Message