Skip to content

Commit

Permalink
LNK-2801 Add Property to Kafka Messages
Browse files Browse the repository at this point in the history
  • Loading branch information
edward-miller-lcg committed Aug 5, 2024
1 parent c9ef3bb commit 91ca511
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace LantanaGroup.Link.DataAcquisition.Application.Models.Kafka;

public class ResourceAcquired
{
public bool AcquisitionComplete { get; set; } = false;
public string PatientId { get; set; }
public string QueryType { get; set; }
public Resource Resource { get; set; }
Expand Down
27 changes: 27 additions & 0 deletions DotNet/DataAcquisition/Application/Services/PatientDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,40 @@ await _queryListProcessor.Process(
}
catch (Exception ex)
{
//produce tailing message
await ProduceTailingMessage(request.FacilityId, request.CorrelationId, patientId, dataAcqRequested.QueryType, dataAcqRequested.ScheduledReports, cancellationToken);

var message =
$"Error retrieving data from EHR for facility: {request.FacilityId}\n{ex.Message}\n{ex.InnerException}";
_logger.LogError(message);
throw;
}
}
}

//produce tailing message to indicate acquisition is complete
await ProduceTailingMessage(request.FacilityId, request.CorrelationId, patientId, dataAcqRequested.QueryType, dataAcqRequested.ScheduledReports, cancellationToken);
}

private async Task ProduceTailingMessage(string facilityId, string correlationId, string patientId, string queryType, List<ScheduledReport> scheduledReports, CancellationToken cancellationToken)
{
await _kafkaProducer.ProduceAsync(
KafkaTopic.ResourceAcquired.ToString(),
new Message<string, ResourceAcquired>
{
Key = facilityId,
Headers = new Headers
{
new Header(DataAcquisitionConstants.HeaderNames.CorrelationId, Encoding.UTF8.GetBytes(correlationId))
},
Value = new ResourceAcquired
{
AcquisitionComplete = true,
PatientId = patientId,
QueryType = queryType,
ScheduledReports = scheduledReports
}
}, cancellationToken);
}

private static string TEMPORARYPatientIdPart(string fullPatientUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace LantanaGroup.Link.Normalization.Application.Models.Messages;

public class ResourceAcquiredMessage
{
public bool AcquisitionComplete { get; set; } = false;
public string PatientId { get; set; }
public string QueryType { get; set; }
public object Resource { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace LantanaGroup.Link.Normalization.Application.Models.Messages;

public class ResourceNormalizedMessage
{
public bool AcquisitionComplete { get; set; } = false;
public string PatientId { get; set; }
public string QueryType { get; set; }
public object Resource { get; set; }
Expand Down
38 changes: 34 additions & 4 deletions DotNet/Normalization/Listeners/ResourceAcquiredListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ await kafkaConsumer.ConsumeWithInstrumentation(async (result, CancellationToken)
throw new DeadLetterException("Message Key (FacilityId) is null or empty.");
}

if (message.Value == null || message.Value.Resource == null || string.IsNullOrWhiteSpace(message.Value.QueryType) || message.Value.ScheduledReports == null)
if (
message.Message.Value == null
|| ((message.Message.Value.Resource == null
|| string.IsNullOrWhiteSpace(message.Message.Value.QueryType)
|| message.Message.Value.ScheduledReports == null)
&& !message.Message.Value.AcquisitionComplete)
)
{
throw new DeadLetterException("Bad message with one of the followign reasons: \n* Null Message \n* Null Resource \n* No QueryType \n* No Scheduled Reports. Skipping message.");
}
Expand All @@ -119,6 +125,30 @@ await kafkaConsumer.ConsumeWithInstrumentation(async (result, CancellationToken)
throw new DeadLetterException("Failed to extract FacilityId and CorrelationId from message.", ex);
}

if (message.Message.Value.AcquisitionComplete && message.Message.Value.Resource == null)
{
_logger.LogInformation("Acquisition Complete tail message received. Producing message for measure eval.");
var headers = new Headers
{
new Header(NormalizationConstants.HeaderNames.CorrelationId, Encoding.UTF8.GetBytes(messageMetaData.correlationId))
};
var resourceNormalizedMessage = new ResourceNormalizedMessage
{
AcquisitionComplete = message.Message.Value.AcquisitionComplete,
PatientId = message.Message.Value.PatientId ?? "",
QueryType = message.Message.Value.QueryType,
ScheduledReports = message.Message.Value.ScheduledReports
};
Message<string, ResourceNormalizedMessage> produceMessage = new Message<string, ResourceNormalizedMessage>
{
Key = messageMetaData.facilityId,
Headers = headers,
Value = resourceNormalizedMessage
};
await kafkaProducer.ProduceAsync(KafkaTopic.ResourceNormalized.ToString(), produceMessage);
return;
}

NormalizationConfig? config = null;
try
{
Expand Down Expand Up @@ -277,10 +307,10 @@ await _auditService.TriggerAuditEvent(new TriggerAuditEventCommand
};
var resourceNormalizedMessage = new ResourceNormalizedMessage
{
PatientId = message.Value.PatientId ?? "",
AcquisitionComplete = message.Message.Value.AcquisitionComplete,
PatientId = message.Message.Value.PatientId ?? "",
Resource = serializedResource,

QueryType = message.Value.QueryType,
QueryType = message.Message.Value.QueryType,
ScheduledReports = message.Message.Value.ScheduledReports
};
Message<string, ResourceNormalizedMessage> produceMessage = new Message<string, ResourceNormalizedMessage>
Expand Down

0 comments on commit 91ca511

Please sign in to comment.