diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs index a2801865..46ef8496 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs @@ -16,5 +16,10 @@ public enum ResourceOperation /// FHIR resource updated /// Updated, + + /// + /// FHIR resource no operation performed + /// + NoOperation, } } diff --git a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs index 4c505838..f3347259 100644 --- a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs +++ b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs @@ -72,45 +72,70 @@ public virtual async Task SaveObservationAsync(ILookupTemplate - .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) - .RetryAsync(2, async (polyRes, attempt) => + var policyResult = await Policy<(Model.Observation observation, ResourceOperation operationType)> + .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) + .RetryAsync(2, async (polyRes, attempt) => + { + // 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request. + // This can happen if 2 independent processes are updating the same Observation simultaneously. + // or + // The update operation failed because the Observation no longer exists. + // This can happen if a cached Observation was deleted from the FHIR Server. + + _logger.LogTrace("A conflict or precondition caused an Observation update to fail. Getting the most recent Observation."); + + // Attempt to get the most recent version of the Observation. + existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false); + + // If the Observation no longer exists on the FHIR Server, it was most likely deleted. + if (existingObservation == null) { - existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false); - }) - .ExecuteAndCaptureAsync(async () => + _logger.LogTrace("A cached version of an Observation was deleted. Creating a new Observation."); + + // Remove the Observation from the cache (this version no longer exists on the FHIR Server. + _observationCache.Remove(cacheKey); + } + }) + .ExecuteAndCaptureAsync(async () => + { + if (existingObservation == null) { - var mergedObservation = MergeObservation(config, existingObservation, observationGroup); - return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false); - }).ConfigureAwait(false); + var newObservation = GenerateObservation(config, observationGroup, identifier, ids); + return (await _client.CreateAsync(newObservation).ConfigureAwait(false), ResourceOperation.Created); + } - var exception = policyResult.FinalException; + // Merge the new data with the existing Observation. + var mergedObservation = MergeObservation(config, existingObservation, observationGroup); - if (exception != null) - { - throw exception; - } + // Check to see if there are any changes after merging. + if (mergedObservation.IsExactly(existingObservation)) + { + // There are no changes to the Observation - Do not update. + return (existingObservation, ResourceOperation.NoOperation); + } + + // Update the Observation. Some failures will be handled in the RetryAsync block above. + return (await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false), ResourceOperation.Updated); + }).ConfigureAwait(false); - result = policyResult.Result; - _logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Updated), 1); + var exception = policyResult.FinalException; + + if (exception != null) + { + throw exception; } + var observation = policyResult.Result.observation; + + _logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, policyResult.Result.operationType), 1); + _observationCache.CreateEntry(cacheKey) .SetAbsoluteExpiration(DateTimeOffset.UtcNow.AddHours(1)) .SetSize(1) - .SetValue(result) + .SetValue(observation) .Dispose(); - return result.Id; + return observation.Id; } public virtual Model.Observation GenerateObservation(ILookupTemplate config, IObservationGroup grp, Model.Identifier observationId, IDictionary ids) diff --git a/test/Microsoft.Health.Fhir.R4.Ingest.UnitTests/Service/R4FhirImportServiceTests.cs b/test/Microsoft.Health.Fhir.R4.Ingest.UnitTests/Service/R4FhirImportServiceTests.cs index 634fe6f5..19b17ec9 100644 --- a/test/Microsoft.Health.Fhir.R4.Ingest.UnitTests/Service/R4FhirImportServiceTests.cs +++ b/test/Microsoft.Health.Fhir.R4.Ingest.UnitTests/Service/R4FhirImportServiceTests.cs @@ -9,7 +9,9 @@ using System.Threading.Tasks; using Hl7.Fhir.Rest; using Microsoft.Extensions.Caching.Memory; +using Microsoft.Health.Common.Telemetry; using Microsoft.Health.Fhir.Ingest.Data; +using Microsoft.Health.Fhir.Ingest.Telemetry; using Microsoft.Health.Fhir.Ingest.Template; using Microsoft.Health.Logging.Telemetry; using Microsoft.Health.Tests.Common; @@ -87,6 +89,7 @@ public async void GivenNotFoundObservation_WhenSaveObservationAsync_ThenCreateIn handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Post)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1); } [Fact] @@ -121,7 +124,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok var templateProcessor = Substitute.For, Model.Observation>>() .Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation())) - .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation)); + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation() { Id = "2" })); var cache = Substitute.For(); var config = Substitute.For>(); @@ -135,6 +138,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1); } [Fact] @@ -168,7 +172,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn // Mock search and update request var handler = Utilities.CreateMockMessageHandler() - .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle2)) + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle1)) .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException(), x => savedObservation)); var fhirClient = Utilities.CreateMockFhirClient(handler); @@ -195,6 +199,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn templateProcessor.ReceivedWithAnyArgs(2).MergeObservation(default, default, default); handler.Received(2).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); handler.Received(2).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1); } [Fact] @@ -233,6 +238,143 @@ public void GivenValidTemplate_WhenGenerateObservation_ExpectedReferencesSet_Tes }); } + [Fact] + public async Task GivenCachedObservationDeleted_WhenGenerateObservation_ThenCacheIsUpdatedAndCreateInvoked_Test() + { + var savedObservation = new Model.Observation { Id = "1" }; + + // Mock the cached Observation + var cache = Substitute.For(); + cache.TryGetValue(Arg.Any(), out Model.Observation observation) + .Returns(x => + { + x[1] = new Model.Observation(); + return true; + }); + + // Mock update request + var handler = Utilities.CreateMockMessageHandler() + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException())) + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(new Model.Bundle())) + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Post)).Returns(savedObservation)); + + var fhirClient = Utilities.CreateMockFhirClient(handler); + + var ids = BuildIdCollection(); + var identityService = Substitute.For() + .Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids))); + + var observationGroup = Substitute.For(); + + var templateProcessor = Substitute.For, Model.Observation>>() + .Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation())) + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation { Id = "2" })); + + var config = Substitute.For>(); + + var logger = Substitute.For(); + + var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger); + + var result = await service.SaveObservationAsync(config, observationGroup, ids); + + var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created); + + templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); + cache.Received(1).Remove(Arg.Any()); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Post)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1); + cache.Received(1).Set(Arg.Any(), savedObservation); + } + + [Fact] + public async Task GivenCachedObservationUnchanged_WhenGenerateObservation_ThenCacheNoOperation_Test() + { + var cachedObservation = new Model.Observation { Id = "1" }; + + // Mock the cached Observation + var cache = Substitute.For(); + cache.TryGetValue(Arg.Any(), out Model.Observation observation) + .Returns(x => + { + x[1] = cachedObservation; + return true; + }); + + var fhirClient = Utilities.CreateMockFhirClient(); + + var ids = BuildIdCollection(); + var identityService = Substitute.For() + .Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids))); + + var observationGroup = Substitute.For(); + + var templateProcessor = Substitute.For, Model.Observation>>() + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(cachedObservation)); + + var config = Substitute.For>(); + + var logger = Substitute.For(); + + var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger); + + var result = await service.SaveObservationAsync(config, observationGroup, ids); + + var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created); + + templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1); + cache.Received(1).Set(Arg.Any(), cachedObservation); + } + + [Fact] + public async void GivenFoundObservationUnchanged_WhenSaveObservationAsync_ThenUpdateInvoked_Test() + { + var foundObservation = new Model.Observation { Id = "1" }; + var foundBundle = new Model.Bundle + { + Entry = new List + { + new Model.Bundle.EntryComponent + { + Resource = foundObservation, + }, + }, + }; + + var savedObservation = new Model.Observation(); + + // Mock search and update request + var handler = Utilities.CreateMockMessageHandler() + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle)); + + var fhirClient = Utilities.CreateMockFhirClient(handler); + + var ids = BuildIdCollection(); + var identityService = Substitute.For() + .Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids))); + + var observationGroup = Substitute.For(); + + var templateProcessor = Substitute.For, Model.Observation>>() + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation)); + + var cache = Substitute.For(); + var config = Substitute.For>(); + + var logger = Substitute.For(); + + var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger); + + var result = await service.SaveObservationAsync(config, observationGroup, ids); + + templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1); + } + private static IDictionary BuildIdCollection() { var lookup = IdentityLookupFactory.Instance.Create();