From 5d4f01cba26a0796750766c1c74bba6c044800a0 Mon Sep 17 00:00:00 2001 From: Nate Malubay Date: Mon, 24 Jan 2022 15:29:07 -0800 Subject: [PATCH 1/3] Improve retry logic to handle invalid cache and no changes to Observation --- .../Data/ResourceOperation.cs | 5 + .../Service/R4FhirImportService.cs | 49 +++++- .../Service/R4FhirImportServiceTests.cs | 145 +++++++++++++++++- 3 files changed, 193 insertions(+), 6 deletions(-) diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs index a2801865..46ef8496 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Data/ResourceOperation.cs @@ -16,5 +16,10 @@ public enum ResourceOperation /// FHIR resource updated /// Updated, + + /// + /// FHIR resource no operation performed + /// + NoOperation, } } diff --git a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs index be58aa34..a3e33699 100644 --- a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs +++ b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs @@ -63,24 +63,64 @@ public virtual async Task SaveObservationAsync(ILookupTemplate .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) .RetryAsync(2, async (polyRes, attempt) => { + // 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request. + // This can happen if 2 independent processes are updating the same Observation simultaneously. + // or + // The update operation failed because the Observation no longer exists. + // This can happen if a cached Observation was deleted from the FHIR Server. + + // Attempt to get the most recent version of the Observation. existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false); + + // If the Observation no longer exists on the FHIR Server, it was most likely deleted. + if (existingObservation == null) + { + // Remove the Observation from the cache (this version no longer exists on the FHIR Server. + _observationCache.Remove(cacheKey); + + // Create a new Observation. + result = await _client.CreateAsync(mergedObservation).ConfigureAwait(false); + } }) .ExecuteAndCaptureAsync(async () => { - var mergedObservation = MergeObservation(config, existingObservation, observationGroup); + if (result != null) + { + // If result is not null, this means that a cached Observation was deleted from the FHIR Server. + // The RetryAsync has handled this condition, so no further action is required, return the result. + operationType = ResourceOperation.Created; + return result; + } + + // Merge the new data with the existing Observation. + mergedObservation = MergeObservation(config, existingObservation, observationGroup); + + // Check to see if there are any changes after merging. + if (mergedObservation.IsExactly(existingObservation)) + { + // There are no changes to the Observation - Do not update. + return existingObservation; + } + + // Update the Observation. Some failures will be handled in the RetryAsync block above. + operationType = ResourceOperation.Updated; return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false); }).ConfigureAwait(false); @@ -92,9 +132,10 @@ public virtual async Task SaveObservationAsync(ILookupTemplate(msg => msg.Method == HttpMethod.Get)); handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Post)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1); } [Fact] @@ -121,7 +124,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok var templateProcessor = Substitute.For, Model.Observation>>() .Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation())) - .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation)); + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation() { Id = "2" })); var cache = Substitute.For(); var config = Substitute.For>(); @@ -135,6 +138,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1); } [Fact] @@ -168,7 +172,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn // Mock search and update request var handler = Utilities.CreateMockMessageHandler() - .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle2)) + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle1)) .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException(), x => savedObservation)); var fhirClient = Utilities.CreateMockFhirClient(handler); @@ -195,6 +199,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn templateProcessor.ReceivedWithAnyArgs(2).MergeObservation(default, default, default); handler.Received(2).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); handler.Received(2).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1); } [Fact] @@ -233,6 +238,142 @@ public void GivenValidTemplate_WhenGenerateObservation_ExpectedReferencesSet_Tes }); } + [Fact] + public async Task GivenCachedObservationDeleted_WhenGenerateObservation_ThenCacheIsUpdatedAndCreateInvoked_Test() + { + var savedObservation = new Model.Observation { Id = "1" }; + + // Mock the cached Observation + var cache = Substitute.For(); + cache.TryGetValue(Arg.Any(), out Model.Observation observation) + .Returns(x => + { + x[1] = new Model.Observation(); + return true; + }); + + // Mock update request + var handler = Utilities.CreateMockMessageHandler() + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException())) + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(new Model.Bundle())) + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Post)).Returns(savedObservation)); + + var fhirClient = Utilities.CreateMockFhirClient(handler); + + var ids = BuildIdCollection(); + var identityService = Substitute.For() + .Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids))); + + var observationGroup = Substitute.For(); + + var templateProcessor = Substitute.For, Model.Observation>>() + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(savedObservation)); + + var config = Substitute.For>(); + + var logger = Substitute.For(); + + var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger); + + var result = await service.SaveObservationAsync(config, observationGroup, ids); + + var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created); + + templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); + cache.Received(1).Remove(Arg.Any()); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Put)); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Post)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1); + cache.Received(1).Set(Arg.Any(), savedObservation); + } + + [Fact] + public async Task GivenCachedObservationUnchanged_WhenGenerateObservation_ThenCacheNoOperation_Test() + { + var cachedObservation = new Model.Observation { Id = "1" }; + + // Mock the cached Observation + var cache = Substitute.For(); + cache.TryGetValue(Arg.Any(), out Model.Observation observation) + .Returns(x => + { + x[1] = cachedObservation; + return true; + }); + + var fhirClient = Utilities.CreateMockFhirClient(); + + var ids = BuildIdCollection(); + var identityService = Substitute.For() + .Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids))); + + var observationGroup = Substitute.For(); + + var templateProcessor = Substitute.For, Model.Observation>>() + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(cachedObservation)); + + var config = Substitute.For>(); + + var logger = Substitute.For(); + + var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger); + + var result = await service.SaveObservationAsync(config, observationGroup, ids); + + var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created); + + templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1); + cache.Received(1).Set(Arg.Any(), cachedObservation); + } + + [Fact] + public async void GivenFoundObservationUnchanged_WhenSaveObservationAsync_ThenUpdateInvoked_Test() + { + var foundObservation = new Model.Observation { Id = "1" }; + var foundBundle = new Model.Bundle + { + Entry = new List + { + new Model.Bundle.EntryComponent + { + Resource = foundObservation, + }, + }, + }; + + var savedObservation = new Model.Observation(); + + // Mock search and update request + var handler = Utilities.CreateMockMessageHandler() + .Mock(m => m.GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle)); + + var fhirClient = Utilities.CreateMockFhirClient(handler); + + var ids = BuildIdCollection(); + var identityService = Substitute.For() + .Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids))); + + var observationGroup = Substitute.For(); + + var templateProcessor = Substitute.For, Model.Observation>>() + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation)); + + var cache = Substitute.For(); + var config = Substitute.For>(); + + var logger = Substitute.For(); + + var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger); + + var result = await service.SaveObservationAsync(config, observationGroup, ids); + + templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default); + handler.Received(1).GetReturnContent(Arg.Is(msg => msg.Method == HttpMethod.Get)); + logger.Received(1).LogMetric(Arg.Is(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1); + } + private static IDictionary BuildIdCollection() { var lookup = IdentityLookupFactory.Instance.Create(); From 21b92df5e53bf468d44c339979ba50f1f6e46ead Mon Sep 17 00:00:00 2001 From: Nate Malubay Date: Tue, 25 Jan 2022 13:00:08 -0800 Subject: [PATCH 2/3] Added logs to track conflict and precondition failures --- .../Service/R4FhirImportService.cs | 12 ++++++++---- .../Service/R4FhirImportServiceTests.cs | 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs index 759ae0e0..5d8c0419 100644 --- a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs +++ b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs @@ -83,8 +83,6 @@ public virtual async Task SaveObservationAsync(ILookupTemplate .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) .RetryAsync(2, async (polyRes, attempt) => @@ -95,17 +93,23 @@ public virtual async Task SaveObservationAsync(ILookupTemplate @@ -119,7 +123,7 @@ public virtual async Task SaveObservationAsync(ILookupTemplate(); var templateProcessor = Substitute.For, Model.Observation>>() - .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(savedObservation)); + .Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation())) + .Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation { Id = "2" })); var config = Substitute.For>(); From e16efcfbdbd1ad1843ba9d6cb5c3e7f0997f5c8a Mon Sep 17 00:00:00 2001 From: Nate Malubay Date: Tue, 25 Jan 2022 14:28:56 -0800 Subject: [PATCH 3/3] Refactored SaveObservation implementation to be more succinct --- .../Service/R4FhirImportService.cs | 126 ++++++++---------- 1 file changed, 53 insertions(+), 73 deletions(-) diff --git a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs index 5d8c0419..f3347259 100644 --- a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs +++ b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs @@ -72,90 +72,70 @@ public virtual async Task SaveObservationAsync(ILookupTemplate + .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) + .RetryAsync(2, async (polyRes, attempt) => + { + // 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request. + // This can happen if 2 independent processes are updating the same Observation simultaneously. + // or + // The update operation failed because the Observation no longer exists. + // This can happen if a cached Observation was deleted from the FHIR Server. + + _logger.LogTrace("A conflict or precondition caused an Observation update to fail. Getting the most recent Observation."); + + // Attempt to get the most recent version of the Observation. + existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false); + + // If the Observation no longer exists on the FHIR Server, it was most likely deleted. + if (existingObservation == null) + { + _logger.LogTrace("A cached version of an Observation was deleted. Creating a new Observation."); + + // Remove the Observation from the cache (this version no longer exists on the FHIR Server. + _observationCache.Remove(cacheKey); + } + }) + .ExecuteAndCaptureAsync(async () => + { + if (existingObservation == null) + { + var newObservation = GenerateObservation(config, observationGroup, identifier, ids); + return (await _client.CreateAsync(newObservation).ConfigureAwait(false), ResourceOperation.Created); + } + + // Merge the new data with the existing Observation. + var mergedObservation = MergeObservation(config, existingObservation, observationGroup); + + // Check to see if there are any changes after merging. + if (mergedObservation.IsExactly(existingObservation)) + { + // There are no changes to the Observation - Do not update. + return (existingObservation, ResourceOperation.NoOperation); + } + + // Update the Observation. Some failures will be handled in the RetryAsync block above. + return (await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false), ResourceOperation.Updated); + }).ConfigureAwait(false); - if (existingObservation == null) + var exception = policyResult.FinalException; + + if (exception != null) { - var newObservation = GenerateObservation(config, observationGroup, identifier, ids); - result = await _client.CreateAsync(newObservation).ConfigureAwait(false); - operationType = ResourceOperation.Created; + throw exception; } - else - { - var policyResult = await Policy - .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) - .RetryAsync(2, async (polyRes, attempt) => - { - // 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request. - // This can happen if 2 independent processes are updating the same Observation simultaneously. - // or - // The update operation failed because the Observation no longer exists. - // This can happen if a cached Observation was deleted from the FHIR Server. - - _logger.LogTrace("A conflict or precondition caused an Observation update to fail. Getting the most recent Observation."); - - // Attempt to get the most recent version of the Observation. - existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false); - - // If the Observation no longer exists on the FHIR Server, it was most likely deleted. - if (existingObservation == null) - { - _logger.LogTrace("A cached version of an Observation was deleted. Creating a new Observation."); - - // Remove the Observation from the cache (this version no longer exists on the FHIR Server. - _observationCache.Remove(cacheKey); - - // Create a new Observation. - var newObservation = GenerateObservation(config, observationGroup, identifier, ids); - result = await _client.CreateAsync(newObservation).ConfigureAwait(false); - return; - } - }) - .ExecuteAndCaptureAsync(async () => - { - if (result != null) - { - // If result is not null, this means that a cached Observation was deleted from the FHIR Server. - // The RetryAsync has handled this condition, so no further action is required, return the result. - operationType = ResourceOperation.Created; - return result; - } - - // Merge the new data with the existing Observation. - var mergedObservation = MergeObservation(config, existingObservation, observationGroup); - - // Check to see if there are any changes after merging. - if (mergedObservation.IsExactly(existingObservation)) - { - // There are no changes to the Observation - Do not update. - return existingObservation; - } - - // Update the Observation. Some failures will be handled in the RetryAsync block above. - operationType = ResourceOperation.Updated; - return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false); - }).ConfigureAwait(false); - - var exception = policyResult.FinalException; - - if (exception != null) - { - throw exception; - } - result = policyResult.Result; - } + var observation = policyResult.Result.observation; - _logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, operationType), 1); + _logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, policyResult.Result.operationType), 1); _observationCache.CreateEntry(cacheKey) .SetAbsoluteExpiration(DateTimeOffset.UtcNow.AddHours(1)) .SetSize(1) - .SetValue(result) + .SetValue(observation) .Dispose(); - return result.Id; + return observation.Id; } public virtual Model.Observation GenerateObservation(ILookupTemplate config, IObservationGroup grp, Model.Identifier observationId, IDictionary ids)