Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retry logic to handle invalid cache and no changes to Observation #161

Merged
merged 4 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ public enum ResourceOperation
/// FHIR resource updated
/// </summary>
Updated,

/// <summary>
/// FHIR resource no operation performed
/// </summary>
NoOperation,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,45 +72,70 @@ public virtual async Task<string> SaveObservationAsync(ILookupTemplate<IFhirTemp
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
}

Model.Observation result;
if (existingObservation == null)
{
var newObservation = GenerateObservation(config, observationGroup, identifier, ids);
result = await _client.CreateAsync(newObservation).ConfigureAwait(false);
_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created), 1);
}
else
{
var policyResult = await Policy<Model.Observation>
.Handle<FhirOperationException>(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed)
.RetryAsync(2, async (polyRes, attempt) =>
var policyResult = await Policy<(Model.Observation observation, ResourceOperation operationType)>
.Handle<FhirOperationException>(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed)
.RetryAsync(2, async (polyRes, attempt) =>
{
// 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request.
// This can happen if 2 independent processes are updating the same Observation simultaneously.
// or
// The update operation failed because the Observation no longer exists.
// This can happen if a cached Observation was deleted from the FHIR Server.

_logger.LogTrace("A conflict or precondition caused an Observation update to fail. Getting the most recent Observation.");

// Attempt to get the most recent version of the Observation.
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);

// If the Observation no longer exists on the FHIR Server, it was most likely deleted.
if (existingObservation == null)
{
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
})
.ExecuteAndCaptureAsync(async () =>
_logger.LogTrace("A cached version of an Observation was deleted. Creating a new Observation.");

// Remove the Observation from the cache (this version no longer exists on the FHIR Server.
_observationCache.Remove(cacheKey);
}
})
.ExecuteAndCaptureAsync(async () =>
{
if (existingObservation == null)
{
var mergedObservation = MergeObservation(config, existingObservation, observationGroup);
return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false);
}).ConfigureAwait(false);
var newObservation = GenerateObservation(config, observationGroup, identifier, ids);
return (await _client.CreateAsync(newObservation).ConfigureAwait(false), ResourceOperation.Created);
}

var exception = policyResult.FinalException;
// Merge the new data with the existing Observation.
var mergedObservation = MergeObservation(config, existingObservation, observationGroup);

if (exception != null)
{
throw exception;
}
// Check to see if there are any changes after merging.
if (mergedObservation.IsExactly(existingObservation))
{
// There are no changes to the Observation - Do not update.
return (existingObservation, ResourceOperation.NoOperation);
}

// Update the Observation. Some failures will be handled in the RetryAsync block above.
return (await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false), ResourceOperation.Updated);
}).ConfigureAwait(false);

result = policyResult.Result;
_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Updated), 1);
var exception = policyResult.FinalException;

if (exception != null)
{
throw exception;
}

var observation = policyResult.Result.observation;

_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, policyResult.Result.operationType), 1);

_observationCache.CreateEntry(cacheKey)
.SetAbsoluteExpiration(DateTimeOffset.UtcNow.AddHours(1))
.SetSize(1)
.SetValue(result)
.SetValue(observation)
.Dispose();

return result.Id;
return observation.Id;
}

public virtual Model.Observation GenerateObservation(ILookupTemplate<IFhirTemplate> config, IObservationGroup grp, Model.Identifier observationId, IDictionary<ResourceType, string> ids)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
using System.Threading.Tasks;
using Hl7.Fhir.Rest;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Health.Common.Telemetry;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Telemetry;
using Microsoft.Health.Fhir.Ingest.Template;
using Microsoft.Health.Logging.Telemetry;
using Microsoft.Health.Tests.Common;
Expand Down Expand Up @@ -87,6 +89,7 @@ public async void GivenNotFoundObservation_WhenSaveObservationAsync_ThenCreateIn

handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -121,7 +124,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation()))
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation));
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation() { Id = "2" }));

var cache = Substitute.For<IMemoryCache>();
var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();
Expand All @@ -135,6 +138,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok
templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -168,7 +172,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn

// Mock search and update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle2))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle1))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException(), x => savedObservation));

var fhirClient = Utilities.CreateMockFhirClient(handler);
Expand All @@ -195,6 +199,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn
templateProcessor.ReceivedWithAnyArgs(2).MergeObservation(default, default, default);
handler.Received(2).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(2).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -233,6 +238,143 @@ public void GivenValidTemplate_WhenGenerateObservation_ExpectedReferencesSet_Tes
});
}

[Fact]
public async Task GivenCachedObservationDeleted_WhenGenerateObservation_ThenCacheIsUpdatedAndCreateInvoked_Test()
{
var savedObservation = new Model.Observation { Id = "1" };

// Mock the cached Observation
var cache = Substitute.For<IMemoryCache>();
cache.TryGetValue(Arg.Any<object>(), out Model.Observation observation)
.Returns(x =>
{
x[1] = new Model.Observation();
return true;
});

// Mock update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException()))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(new Model.Bundle()))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post)).Returns(savedObservation));

var fhirClient = Utilities.CreateMockFhirClient(handler);

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation()))
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation { Id = "2" }));

var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
cache.Received(1).Remove(Arg.Any<string>());
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1);
cache.Received(1).Set(Arg.Any<object>(), savedObservation);
}

[Fact]
public async Task GivenCachedObservationUnchanged_WhenGenerateObservation_ThenCacheNoOperation_Test()
{
var cachedObservation = new Model.Observation { Id = "1" };

// Mock the cached Observation
var cache = Substitute.For<IMemoryCache>();
cache.TryGetValue(Arg.Any<object>(), out Model.Observation observation)
.Returns(x =>
{
x[1] = cachedObservation;
return true;
});

var fhirClient = Utilities.CreateMockFhirClient();

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(cachedObservation));

var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1);
cache.Received(1).Set(Arg.Any<object>(), cachedObservation);
}

[Fact]
public async void GivenFoundObservationUnchanged_WhenSaveObservationAsync_ThenUpdateInvoked_Test()
{
var foundObservation = new Model.Observation { Id = "1" };
var foundBundle = new Model.Bundle
{
Entry = new List<Model.Bundle.EntryComponent>
{
new Model.Bundle.EntryComponent
{
Resource = foundObservation,
},
},
};

var savedObservation = new Model.Observation();

// Mock search and update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle));

var fhirClient = Utilities.CreateMockFhirClient(handler);

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation));

var cache = Substitute.For<IMemoryCache>();
var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1);
}

private static IDictionary<Data.ResourceType, string> BuildIdCollection()
{
var lookup = IdentityLookupFactory.Instance.Create();
Expand Down