diff --git a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs index 5d8c0419..f3347259 100644 --- a/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs +++ b/src/lib/Microsoft.Health.Fhir.R4.Ingest/Service/R4FhirImportService.cs @@ -72,90 +72,70 @@ public virtual async Task SaveObservationAsync(ILookupTemplate + .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) + .RetryAsync(2, async (polyRes, attempt) => + { + // 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request. + // This can happen if 2 independent processes are updating the same Observation simultaneously. + // or + // The update operation failed because the Observation no longer exists. + // This can happen if a cached Observation was deleted from the FHIR Server. + + _logger.LogTrace("A conflict or precondition caused an Observation update to fail. Getting the most recent Observation."); + + // Attempt to get the most recent version of the Observation. + existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false); + + // If the Observation no longer exists on the FHIR Server, it was most likely deleted. + if (existingObservation == null) + { + _logger.LogTrace("A cached version of an Observation was deleted. Creating a new Observation."); + + // Remove the Observation from the cache (this version no longer exists on the FHIR Server. + _observationCache.Remove(cacheKey); + } + }) + .ExecuteAndCaptureAsync(async () => + { + if (existingObservation == null) + { + var newObservation = GenerateObservation(config, observationGroup, identifier, ids); + return (await _client.CreateAsync(newObservation).ConfigureAwait(false), ResourceOperation.Created); + } + + // Merge the new data with the existing Observation. + var mergedObservation = MergeObservation(config, existingObservation, observationGroup); + + // Check to see if there are any changes after merging. + if (mergedObservation.IsExactly(existingObservation)) + { + // There are no changes to the Observation - Do not update. + return (existingObservation, ResourceOperation.NoOperation); + } + + // Update the Observation. Some failures will be handled in the RetryAsync block above. + return (await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false), ResourceOperation.Updated); + }).ConfigureAwait(false); - if (existingObservation == null) + var exception = policyResult.FinalException; + + if (exception != null) { - var newObservation = GenerateObservation(config, observationGroup, identifier, ids); - result = await _client.CreateAsync(newObservation).ConfigureAwait(false); - operationType = ResourceOperation.Created; + throw exception; } - else - { - var policyResult = await Policy - .Handle(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed) - .RetryAsync(2, async (polyRes, attempt) => - { - // 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request. - // This can happen if 2 independent processes are updating the same Observation simultaneously. - // or - // The update operation failed because the Observation no longer exists. - // This can happen if a cached Observation was deleted from the FHIR Server. - - _logger.LogTrace("A conflict or precondition caused an Observation update to fail. Getting the most recent Observation."); - - // Attempt to get the most recent version of the Observation. - existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false); - - // If the Observation no longer exists on the FHIR Server, it was most likely deleted. - if (existingObservation == null) - { - _logger.LogTrace("A cached version of an Observation was deleted. Creating a new Observation."); - - // Remove the Observation from the cache (this version no longer exists on the FHIR Server. - _observationCache.Remove(cacheKey); - - // Create a new Observation. - var newObservation = GenerateObservation(config, observationGroup, identifier, ids); - result = await _client.CreateAsync(newObservation).ConfigureAwait(false); - return; - } - }) - .ExecuteAndCaptureAsync(async () => - { - if (result != null) - { - // If result is not null, this means that a cached Observation was deleted from the FHIR Server. - // The RetryAsync has handled this condition, so no further action is required, return the result. - operationType = ResourceOperation.Created; - return result; - } - - // Merge the new data with the existing Observation. - var mergedObservation = MergeObservation(config, existingObservation, observationGroup); - - // Check to see if there are any changes after merging. - if (mergedObservation.IsExactly(existingObservation)) - { - // There are no changes to the Observation - Do not update. - return existingObservation; - } - - // Update the Observation. Some failures will be handled in the RetryAsync block above. - operationType = ResourceOperation.Updated; - return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false); - }).ConfigureAwait(false); - - var exception = policyResult.FinalException; - - if (exception != null) - { - throw exception; - } - result = policyResult.Result; - } + var observation = policyResult.Result.observation; - _logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, operationType), 1); + _logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, policyResult.Result.operationType), 1); _observationCache.CreateEntry(cacheKey) .SetAbsoluteExpiration(DateTimeOffset.UtcNow.AddHours(1)) .SetSize(1) - .SetValue(result) + .SetValue(observation) .Dispose(); - return result.Id; + return observation.Id; } public virtual Model.Observation GenerateObservation(ILookupTemplate config, IObservationGroup grp, Model.Identifier observationId, IDictionary ids)