Skip to content

Commit

Permalink
Merge branch 'dev' into LNK-2723-Query-Dispatch-refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
arianamihailescu authored Aug 1, 2024
2 parents 9bd8335 + 87c5934 commit 06c7cbb
Show file tree
Hide file tree
Showing 19 changed files with 380 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ public FhirQueryConfigurationManager(IDatabase database, ILogger<FhirQueryConfig
throw new NotFoundException($"No configuration found for facilityId: {facilityId}. Unable to retrieve Authentication settings.");
}

if (queryResult.Authentication == null)
{
throw new NotFoundException($"No Authentication found on configuration for facilityId: {facilityId}. Unable to retrieve Authentication settings.");
}

return queryResult.Authentication;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

namespace LantanaGroup.Link.DataAcquisition.Application.Models;

public record FacilityConnectionResult(bool IsConnected, bool IsPatientFound, string? ErrorMessage = null, Patient? patient = null);
public record FacilityConnectionResult(bool IsConnected, bool IsPatientFound, string? ErrorMessage = null, List<Resource>? results = null);
54 changes: 52 additions & 2 deletions DotNet/DataAcquisition/Application/Services/PatientDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace LantanaGroup.Link.DataAcquisition.Application.Services;
public interface IPatientDataService
{
Task Get(GetPatientDataRequest request, CancellationToken cancellationToken);
Task<List<Resource>> Get_NoKafka(GetPatientDataRequest request, CancellationToken cancellationToken = default);
}

public class PatientDataService : IPatientDataService
Expand Down Expand Up @@ -55,6 +56,55 @@ public PatientDataService(
_queryListProcessor = queryListProcessor ?? throw new ArgumentNullException(nameof(queryListProcessor));
}

public async Task<List<Resource>> Get_NoKafka(GetPatientDataRequest request, CancellationToken cancellationToken = default)
{
var authenticationConfig = await _fhirQueryManager.GetAuthenticationConfigurationByFacilityId(request.FacilityId, cancellationToken);
var queryConfig = await _fhirQueryManager.GetAsync(request.FacilityId, cancellationToken);
var patient = await _fhirRepo.GetPatient(
queryConfig.FhirServerBaseUrl,
request.ConsumeResult.Value.PatientId,
Guid.NewGuid().ToString(),
request.FacilityId,
authenticationConfig,
cancellationToken) ?? throw new NotFoundException("Patient not found.");
var queryPlan = (
await _queryPlanManager.FindAsync(
q => q.FacilityId.ToLower() == request.FacilityId.ToLower()
&& q.PlanName.ToLower() == request.ConsumeResult.Value.ScheduledReports.FirstOrDefault().ReportType.ToLower(), cancellationToken))
.FirstOrDefault();

if (queryPlan == null)
throw new MissingFacilityConfigurationException("Query Plan not found.");

var resources = new List<Resource>();

var initialQueries = queryPlan.InitialQueries.OrderBy(x => x.Key);
var supplementalQueries = queryPlan.SupplementalQueries.OrderBy(x => x.Key);

var referenceTypes = queryPlan.InitialQueries.Values.OfType<ReferenceQueryConfig>().Select(x => x.ResourceType).Distinct().ToList();
referenceTypes.AddRange(queryPlan.SupplementalQueries.Values.OfType<ReferenceQueryConfig>().Select(x => x.ResourceType).Distinct().ToList());

resources.AddRange(await _queryListProcessor.Process_NoKafka(
queryPlan.InitialQueries.OrderBy(x => x.Key),
request,
queryConfig,
request.ConsumeResult.Value.ScheduledReports.FirstOrDefault(),
queryPlan,
referenceTypes,
QueryPlanType.Initial.ToString()));

resources.AddRange(await _queryListProcessor.Process_NoKafka(
queryPlan.SupplementalQueries.OrderBy(x => x.Key),
request,
queryConfig,
request.ConsumeResult.Value.ScheduledReports.FirstOrDefault(),
queryPlan,
referenceTypes,
QueryPlanType.Supplemental.ToString()));

return resources;
}

public async Task Get(GetPatientDataRequest request, CancellationToken cancellationToken)
{
var dataAcqRequested = request.ConsumeResult.Message.Value;
Expand Down Expand Up @@ -121,7 +171,7 @@ await _kafkaProducer.ProduceAsync(
foreach (var scheduledReport in dataAcqRequested.ScheduledReports)
{
var queryPlan = queryPlans.FirstOrDefault(x => x.ReportType == scheduledReport.ReportType);

if (queryPlan != null)
{
var initialQueries = queryPlan.InitialQueries.OrderBy(x => x.Key);
Expand All @@ -143,7 +193,7 @@ await _queryListProcessor.Process(
dataAcqRequested.QueryType.Equals("Initial", StringComparison.InvariantCultureIgnoreCase) ? QueryPlanType.Initial.ToString() : QueryPlanType.Supplemental.ToString(), cancellationToken);

}
catch(ProduceException<string, ResourceAcquired>)
catch (ProduceException<string, ResourceAcquired>)
{
throw;
}
Expand Down
98 changes: 98 additions & 0 deletions DotNet/DataAcquisition/Application/Services/QueryListProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ namespace LantanaGroup.Link.DataAcquisition.Application.Services;

public interface IQueryListProcessor
{
Task<List<Resource>> Process_NoKafka(
IOrderedEnumerable<KeyValuePair<string, IQueryConfig>> queryList,
GetPatientDataRequest request,
FhirQueryConfiguration fhirQueryConfiguration,
ScheduledReport scheduledReport,
QueryPlan queryPlan,
List<string> referenceTypes,
string queryPlanType,
CancellationToken cancellationToken = default
);

Task Process(IOrderedEnumerable<KeyValuePair<string, IQueryConfig>> queryList,
GetPatientDataRequest request,
FhirQueryConfiguration fhirQueryConfiguration,
Expand Down Expand Up @@ -53,6 +64,93 @@ public QueryListProcessor(
_producerConfig.CompressionType = CompressionType.Zstd;
}

public async Task<List<Resource>> Process_NoKafka(
IOrderedEnumerable<KeyValuePair<string, IQueryConfig>> queryList,
GetPatientDataRequest request,
FhirQueryConfiguration fhirQueryConfiguration,
ScheduledReport scheduledReport,
QueryPlan queryPlan,
List<string> referenceTypes,
string queryPlanType,
CancellationToken cancellationToken = default
)
{
var resources = new List<Resource>();
List<ResourceReference> referenceResources = new List<ResourceReference>();
foreach (var query in queryList)
{
var queryConfig = query.Value;
QueryFactoryResult builtQuery = queryConfig switch
{
ParameterQueryConfig => ParameterQueryFactory.Build((ParameterQueryConfig)queryConfig, request,
scheduledReport, queryPlan.LookBack),
ReferenceQueryConfig => ReferenceQueryFactory.Build((ReferenceQueryConfig)queryConfig, referenceResources),
_ => throw new Exception("Unable to identify type for query operation."),
};

_logger.LogInformation("Processing Query for {QueryType}", builtQuery.GetType().Name);

if (builtQuery.GetType() == typeof(SingularParameterQueryFactoryResult))
{
var queryInfo = (ParameterQueryConfig)queryConfig;
_logger.LogInformation("Resource: {1}", queryInfo.ResourceType);

var bundle = await _fhirRepo.GetSingularBundledResultsAsync(
fhirQueryConfiguration.FhirServerBaseUrl,
request.ConsumeResult.Message.Value.PatientId,
request.CorrelationId,
request.FacilityId,
queryPlanType,
(SingularParameterQueryFactoryResult)builtQuery,
(ParameterQueryConfig)queryConfig,
scheduledReport,
fhirQueryConfiguration.Authentication);

referenceResources.AddRange(ReferenceResourceBundleExtractor.Extract(bundle, referenceTypes));
resources.AddRange(bundle.Entry.Select(e => e.Resource));
}

if (builtQuery.GetType() == typeof(PagedParameterQueryFactoryResult))
{
var queryInfo = (ParameterQueryConfig)queryConfig;
_logger.LogInformation("Resource: {1}", queryInfo.ResourceType);

var bundle = await _fhirRepo.GetPagedBundledResultsAsync(
fhirQueryConfiguration.FhirServerBaseUrl,
request.ConsumeResult.Message.Value.PatientId,
request.CorrelationId,
request.FacilityId,
queryPlanType,
(PagedParameterQueryFactoryResult)builtQuery,
(ParameterQueryConfig)queryConfig,
scheduledReport,
fhirQueryConfiguration.Authentication);

referenceResources.AddRange(ReferenceResourceBundleExtractor.Extract(bundle, referenceTypes));
resources.AddRange(bundle.Entry.Select(e => e.Resource));
}

if (builtQuery.GetType() == typeof(ReferenceQueryFactoryResult))
{
var referenceQueryFactoryResult = (ReferenceQueryFactoryResult)builtQuery;

var queryInfo = (ReferenceQueryConfig)queryConfig;
_logger.LogInformation("Resource: {1}", queryInfo.ResourceType);

var results = await _referenceResourceService.Execute_NoKafka(
referenceQueryFactoryResult,
request,
fhirQueryConfiguration,
queryInfo,
queryPlanType);

resources.AddRange(results);
}
}

return resources;
}

public async Task Process(
IOrderedEnumerable<KeyValuePair<string, IQueryConfig>> queryList,
GetPatientDataRequest request,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Confluent.Kafka;
using Hl7.Fhir.Model;
using Hl7.Fhir.Serialization;
using LantanaGroup.Link.DataAcquisition.Application.Interfaces;
using LantanaGroup.Link.DataAcquisition.Application.Models;
using LantanaGroup.Link.DataAcquisition.Application.Models.Factory.ReferenceQuery;
using LantanaGroup.Link.DataAcquisition.Application.Models.Kafka;
Expand All @@ -12,6 +13,7 @@
using LantanaGroup.Link.DataAcquisition.Domain.Models.QueryConfig;
using LantanaGroup.Link.DataAcquisition.Domain.Settings;
using LantanaGroup.Link.Shared.Application.Models;
using LantanaGroup.Link.Shared.Application.Models.Telemetry;
using System.Text;
using System.Text.Json;
using Task = System.Threading.Tasks.Task;
Expand All @@ -20,6 +22,14 @@ namespace LantanaGroup.Link.DataAcquisition.Application.Services;

public interface IReferenceResourceService
{
Task<List<Resource>> Execute_NoKafka(
ReferenceQueryFactoryResult referenceQueryFactoryResult,
GetPatientDataRequest request,
FhirQueryConfiguration fhirQueryConfiguration,
ReferenceQueryConfig referenceQueryConfig,
string queryPlanType,
CancellationToken cancellationToken = default);

Task Execute(
ReferenceQueryFactoryResult referenceQueryFactoryResult,
GetPatientDataRequest request,
Expand All @@ -36,19 +46,65 @@ public class ReferenceResourceService : IReferenceResourceService
private readonly IQueriedFhirResourceManager _queriedFhirResourceManager;
private readonly IFhirApiService _fhirRepo;
private readonly IProducer<string, ResourceAcquired> _kafkaProducer;
private readonly IDataAcquisitionServiceMetrics _metrics;

public ReferenceResourceService(
ILogger<ReferenceResourceService> logger,
IReferenceResourcesManager referenceResourcesManager,
IFhirApiService fhirRepo,
IProducer<string, ResourceAcquired> kafkaProducer,
IQueriedFhirResourceManager queriedFhirResourceManager)
IQueriedFhirResourceManager queriedFhirResourceManager,
IDataAcquisitionServiceMetrics metrics)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_referenceResourcesManager = referenceResourcesManager ?? throw new ArgumentNullException(nameof(referenceResourcesManager));
_fhirRepo = fhirRepo ?? throw new ArgumentNullException(nameof(fhirRepo));
_kafkaProducer = kafkaProducer ?? throw new ArgumentNullException(nameof(kafkaProducer));
_queriedFhirResourceManager = queriedFhirResourceManager ?? throw new ArgumentNullException(nameof(queriedFhirResourceManager));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
}

public async Task<List<Resource>> Execute_NoKafka(ReferenceQueryFactoryResult referenceQueryFactoryResult, GetPatientDataRequest request, FhirQueryConfiguration fhirQueryConfiguration, ReferenceQueryConfig referenceQueryConfig, string queryPlanType, CancellationToken cancellationToken = default)
{
var resources = new List<Resource>();
if (referenceQueryFactoryResult.ReferenceIds?.Count == 0)
{
return resources;
}

var validReferenceResources =
referenceQueryFactoryResult
?.ReferenceIds
?.Where(x => x.TypeName == referenceQueryConfig.ResourceType || x.Reference.StartsWith(referenceQueryConfig.ResourceType, StringComparison.InvariantCultureIgnoreCase))
.ToList();

var existingReferenceResources =
await _referenceResourcesManager.GetReferenceResourcesForListOfIds(
validReferenceResources.Select(x => x.Reference.SplitReference()).ToList(),
request.FacilityId);

resources.AddRange(existingReferenceResources.Select(x => FhirResourceDeserializer.DeserializeFhirResource(x)));

List<ResourceReference> missingReferences = validReferenceResources
.Where(x => !existingReferenceResources.Any(y => y.ResourceId == x.Reference.SplitReference())).ToList();

foreach(var x in missingReferences)
{
var fullMissingResources = await _fhirRepo.GetReferenceResource(
fhirQueryConfiguration.FhirServerBaseUrl,
referenceQueryFactoryResult.ResourceType,
request.ConsumeResult.Message.Value.PatientId,
request.FacilityId,
request.CorrelationId,
queryPlanType,
x,
referenceQueryConfig,
fhirQueryConfiguration.Authentication);

resources.AddRange(fullMissingResources);
}

return resources;
}

public async Task Execute(
Expand Down Expand Up @@ -88,7 +144,7 @@ await _queriedFhirResourceManager.AddAsync(new QueriedFhirResourceRecord
ResourceType = referenceQueryFactoryResult.ResourceType,
CreateDate = DateTime.UtcNow,
ModifyDate = DateTime.UtcNow
});
});

await GenerateMessage(
FhirResourceDeserializer.DeserializeFhirResource(existingReference),
Expand All @@ -97,8 +153,11 @@ await GenerateMessage(
queryPlanType,
request.CorrelationId,
request.ConsumeResult.Message.Value.ScheduledReports);
}


// Increment metric for resource acquired
IncrementResourceAcquiredMetric(request.CorrelationId, request.ConsumeResult.Message.Value.PatientId.SplitReference(), request.FacilityId,
queryPlanType, referenceQueryFactoryResult.ResourceType, existingReference.ResourceId);
}

List<ResourceReference> missingReferences = validReferenceResources
.Where(x => !existingReferenceResources.Any(y => y.ResourceId.Equals(x.Reference.SplitReference(), StringComparison.InvariantCultureIgnoreCase))).ToList();
Expand Down Expand Up @@ -189,4 +248,16 @@ await _kafkaProducer.ProduceAsync(
});

}

private void IncrementResourceAcquiredMetric(string? correlationId, string? patientIdReference, string? facilityId, string? queryType, string resourceType, string resourceId)
{
_metrics.IncrementResourceAcquiredCounter([
new KeyValuePair<string, object?>(DiagnosticNames.CorrelationId, correlationId),
new KeyValuePair<string, object?>(DiagnosticNames.FacilityId, facilityId),
new KeyValuePair<string, object?>(DiagnosticNames.PatientId, patientIdReference), //TODO: Can we keep this?
new KeyValuePair<string, object?>(DiagnosticNames.QueryType, queryType),
new KeyValuePair<string, object?>(DiagnosticNames.Resource, resourceType),
new KeyValuePair<string, object?>(DiagnosticNames.ResourceId, resourceId)
]);
}
}
Loading

0 comments on commit 06c7cbb

Please sign in to comment.