Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LNK-2801 Add Property to Kafka Messages #419

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace LantanaGroup.Link.DataAcquisition.Application.Models.Kafka;

public class ResourceAcquired
{
public bool AcquisitionComplete { get; set; } = false;
public string PatientId { get; set; }
public string QueryType { get; set; }
public Resource Resource { get; set; }
Expand Down
27 changes: 27 additions & 0 deletions DotNet/DataAcquisition/Application/Services/PatientDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,40 @@ await _queryListProcessor.Process(
}
catch (Exception ex)
{
//produce tailing message
await ProduceTailingMessage(request.FacilityId, request.CorrelationId, patientId, dataAcqRequested.QueryType, dataAcqRequested.ScheduledReports, cancellationToken);

var message =
$"Error retrieving data from EHR for facility: {request.FacilityId}\n{ex.Message}\n{ex.InnerException}";
_logger.LogError(message);
throw;
}
}
}

//produce tailing message to indicate acquisition is complete
await ProduceTailingMessage(request.FacilityId, request.CorrelationId, patientId, dataAcqRequested.QueryType, dataAcqRequested.ScheduledReports, cancellationToken);
}

private async Task ProduceTailingMessage(string facilityId, string correlationId, string patientId, string queryType, List<ScheduledReport> scheduledReports, CancellationToken cancellationToken)
{
await _kafkaProducer.ProduceAsync(
KafkaTopic.ResourceAcquired.ToString(),
new Message<string, ResourceAcquired>
{
Key = facilityId,
Headers = new Headers
{
new Header(DataAcquisitionConstants.HeaderNames.CorrelationId, Encoding.UTF8.GetBytes(correlationId))
},
Value = new ResourceAcquired
{
AcquisitionComplete = true,
PatientId = patientId,
QueryType = queryType,
ScheduledReports = scheduledReports
}
}, cancellationToken);
}

private static string TEMPORARYPatientIdPart(string fullPatientUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace LantanaGroup.Link.Normalization.Application.Models.Messages;

public class ResourceAcquiredMessage
{
public bool AcquisitionComplete { get; set; } = false;
public string PatientId { get; set; }
public string QueryType { get; set; }
public object Resource { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace LantanaGroup.Link.Normalization.Application.Models.Messages;

public class ResourceNormalizedMessage
{
public bool AcquisitionComplete { get; set; } = false;
public string PatientId { get; set; }
public string QueryType { get; set; }
public object Resource { get; set; }
Expand Down
38 changes: 34 additions & 4 deletions DotNet/Normalization/Listeners/ResourceAcquiredListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ await kafkaConsumer.ConsumeWithInstrumentation(async (result, CancellationToken)
throw new DeadLetterException("Message Key (FacilityId) is null or empty.");
}

if (message.Value == null || message.Value.Resource == null || string.IsNullOrWhiteSpace(message.Value.QueryType) || message.Value.ScheduledReports == null)
if (
message.Message.Value == null
|| ((message.Message.Value.Resource == null
|| string.IsNullOrWhiteSpace(message.Message.Value.QueryType)
|| message.Message.Value.ScheduledReports == null)
&& !message.Message.Value.AcquisitionComplete)
)
{
throw new DeadLetterException("Bad message with one of the followign reasons: \n* Null Message \n* Null Resource \n* No QueryType \n* No Scheduled Reports. Skipping message.");
}
Expand Down Expand Up @@ -137,6 +143,30 @@ await kafkaConsumer.ConsumeWithInstrumentation(async (result, CancellationToken)
throw new DeadLetterException("An error was encountered retrieving facility configuration.", ex);
}

if (message.Message.Value.AcquisitionComplete && message.Message.Value.Resource == null)
{
_logger.LogInformation("Acquisition Complete tail message received. Producing message for measure eval.");
var headers = new Headers
{
new Header(NormalizationConstants.HeaderNames.CorrelationId, Encoding.UTF8.GetBytes(messageMetaData.correlationId))
};
var resourceNormalizedMessage = new ResourceNormalizedMessage
{
AcquisitionComplete = message.Message.Value.AcquisitionComplete,
PatientId = message.Message.Value.PatientId ?? "",
QueryType = message.Message.Value.QueryType,
ScheduledReports = message.Message.Value.ScheduledReports
};
Message<string, ResourceNormalizedMessage> produceMessage = new Message<string, ResourceNormalizedMessage>
{
Key = messageMetaData.facilityId,
Headers = headers,
Value = resourceNormalizedMessage
};
await kafkaProducer.ProduceAsync(KafkaTopic.ResourceNormalized.ToString(), produceMessage);
return;
}

edward-miller-lcg marked this conversation as resolved.
Show resolved Hide resolved
if (config.OperationSequence == null || config.OperationSequence.Count == 0)
{
throw new DeadLetterException("No operation sequence found for facility.");
Expand Down Expand Up @@ -277,10 +307,10 @@ await _auditService.TriggerAuditEvent(new TriggerAuditEventCommand
};
var resourceNormalizedMessage = new ResourceNormalizedMessage
{
PatientId = message.Value.PatientId ?? "",
AcquisitionComplete = message.Message.Value.AcquisitionComplete,
PatientId = message.Message.Value.PatientId ?? "",
Resource = serializedResource,

QueryType = message.Value.QueryType,
QueryType = message.Message.Value.QueryType,
ScheduledReports = message.Message.Value.ScheduledReports
};
Message<string, ResourceNormalizedMessage> produceMessage = new Message<string, ResourceNormalizedMessage>
Expand Down
Loading