Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retry logic to handle invalid cache and no changes to Observation #161

Merged
merged 4 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ public enum ResourceOperation
/// FHIR resource updated
/// </summary>
Updated,

/// <summary>
/// FHIR resource no operation performed
/// </summary>
NoOperation,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,66 @@ public virtual async Task<string> SaveObservationAsync(ILookupTemplate<IFhirTemp
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
}

Model.Observation result;
Model.Observation result = null;
ResourceOperation operationType = ResourceOperation.NoOperation;

if (existingObservation == null)
{
var newObservation = GenerateObservation(config, observationGroup, identifier, ids);
result = await _client.CreateAsync(newObservation).ConfigureAwait(false);
_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created), 1);
operationType = ResourceOperation.Created;
}
else
{
Model.Observation mergedObservation = null;
namalu marked this conversation as resolved.
Show resolved Hide resolved

var policyResult = await Policy<Model.Observation>
.Handle<FhirOperationException>(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed)
.RetryAsync(2, async (polyRes, attempt) =>
{
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
})
.ExecuteAndCaptureAsync(async () =>
{
var mergedObservation = MergeObservation(config, existingObservation, observationGroup);
return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false);
}).ConfigureAwait(false);
.Handle<FhirOperationException>(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed)
.RetryAsync(2, async (polyRes, attempt) =>
namalu marked this conversation as resolved.
Show resolved Hide resolved
{
// 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request.
// This can happen if 2 independent processes are updating the same Observation simultaneously.
// or
// The update operation failed because the Observation no longer exists.
// This can happen if a cached Observation was deleted from the FHIR Server.

// Attempt to get the most recent version of the Observation.
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
namalu marked this conversation as resolved.
Show resolved Hide resolved

// If the Observation no longer exists on the FHIR Server, it was most likely deleted.
if (existingObservation == null)
{
// Remove the Observation from the cache (this version no longer exists on the FHIR Server.
_observationCache.Remove(cacheKey);

// Create a new Observation.
result = await _client.CreateAsync(mergedObservation).ConfigureAwait(false);
namalu marked this conversation as resolved.
Show resolved Hide resolved
}
})
.ExecuteAndCaptureAsync(async () =>
{
if (result != null)
namalu marked this conversation as resolved.
Show resolved Hide resolved
{
// If result is not null, this means that a cached Observation was deleted from the FHIR Server.
// The RetryAsync has handled this condition, so no further action is required, return the result.
operationType = ResourceOperation.Created;
return result;
}

// Merge the new data with the existing Observation.
namalu marked this conversation as resolved.
Show resolved Hide resolved
mergedObservation = MergeObservation(config, existingObservation, observationGroup);

// Check to see if there are any changes after merging.
if (mergedObservation.IsExactly(existingObservation))
namalu marked this conversation as resolved.
Show resolved Hide resolved
{
// There are no changes to the Observation - Do not update.
return existingObservation;
}

// Update the Observation. Some failures will be handled in the RetryAsync block above.
operationType = ResourceOperation.Updated;
return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false);
}).ConfigureAwait(false);

var exception = policyResult.FinalException;

Expand All @@ -101,9 +141,10 @@ public virtual async Task<string> SaveObservationAsync(ILookupTemplate<IFhirTemp
}

result = policyResult.Result;
_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Updated), 1);
}

_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, operationType), 1);

_observationCache.CreateEntry(cacheKey)
.SetAbsoluteExpiration(DateTimeOffset.UtcNow.AddHours(1))
.SetSize(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
using System.Threading.Tasks;
using Hl7.Fhir.Rest;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Health.Common.Telemetry;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Telemetry;
using Microsoft.Health.Fhir.Ingest.Template;
using Microsoft.Health.Logging.Telemetry;
using Microsoft.Health.Tests.Common;
Expand Down Expand Up @@ -87,6 +89,7 @@ public async void GivenNotFoundObservation_WhenSaveObservationAsync_ThenCreateIn

handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -121,7 +124,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation()))
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation));
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation() { Id = "2" }));

var cache = Substitute.For<IMemoryCache>();
var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();
Expand All @@ -135,6 +138,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok
templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -168,7 +172,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn

// Mock search and update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle2))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle1))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException(), x => savedObservation));

var fhirClient = Utilities.CreateMockFhirClient(handler);
Expand All @@ -195,6 +199,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn
templateProcessor.ReceivedWithAnyArgs(2).MergeObservation(default, default, default);
handler.Received(2).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(2).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -233,6 +238,142 @@ public void GivenValidTemplate_WhenGenerateObservation_ExpectedReferencesSet_Tes
});
}

[Fact]
public async Task GivenCachedObservationDeleted_WhenGenerateObservation_ThenCacheIsUpdatedAndCreateInvoked_Test()
{
var savedObservation = new Model.Observation { Id = "1" };

// Mock the cached Observation
var cache = Substitute.For<IMemoryCache>();
cache.TryGetValue(Arg.Any<object>(), out Model.Observation observation)
.Returns(x =>
{
x[1] = new Model.Observation();
return true;
});

// Mock update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException()))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(new Model.Bundle()))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post)).Returns(savedObservation));

var fhirClient = Utilities.CreateMockFhirClient(handler);

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(savedObservation));

var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
cache.Received(1).Remove(Arg.Any<string>());
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1);
cache.Received(1).Set(Arg.Any<object>(), savedObservation);
}

[Fact]
public async Task GivenCachedObservationUnchanged_WhenGenerateObservation_ThenCacheNoOperation_Test()
{
var cachedObservation = new Model.Observation { Id = "1" };

// Mock the cached Observation
var cache = Substitute.For<IMemoryCache>();
cache.TryGetValue(Arg.Any<object>(), out Model.Observation observation)
.Returns(x =>
{
x[1] = cachedObservation;
return true;
});

var fhirClient = Utilities.CreateMockFhirClient();

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(cachedObservation));

var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1);
cache.Received(1).Set(Arg.Any<object>(), cachedObservation);
}

[Fact]
public async void GivenFoundObservationUnchanged_WhenSaveObservationAsync_ThenUpdateInvoked_Test()
{
var foundObservation = new Model.Observation { Id = "1" };
var foundBundle = new Model.Bundle
{
Entry = new List<Model.Bundle.EntryComponent>
{
new Model.Bundle.EntryComponent
{
Resource = foundObservation,
},
},
};

var savedObservation = new Model.Observation();

// Mock search and update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle));

var fhirClient = Utilities.CreateMockFhirClient(handler);

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation));

var cache = Substitute.For<IMemoryCache>();
var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1);
}

private static IDictionary<Data.ResourceType, string> BuildIdCollection()
{
var lookup = IdentityLookupFactory.Instance.Create();
Expand Down