Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update how measurements are processed #196

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,23 @@ protected virtual IEnumerable<JToken> MatchTypeTokens(JObject token)
EnsureArg.IsNotNull(token, nameof(token));
var evaluator = CreateRequiredExpressionEvaluator(Template.TypeMatchExpression, nameof(Template.TypeMatchExpression));

JObject tokenClone = null;

foreach (var extractedToken in evaluator.SelectTokens(token))
{
// Add the extracted data as an element of the original data.
// This allows subsequent expressions access to data from the original event data
if (tokenClone == null)
{
tokenClone = new JObject(token);
}

var tokenClone = token.DeepClone() as JObject;
tokenClone.Add(MatchedToken, extractedToken);
yield return tokenClone;
tokenClone.Remove(MatchedToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work? We are returning the tokenClone and them updating the same reference to remove the property. I don't think we can do the remove. Anything downstream of this will potentially be missing the MatchedToken.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work as intended as this is a synchronous IEnumerable. We first yield the cloned JToken and then produce a Measurement from it. Finally we return here to remove the MatchedToken property and start the loop again. The code producing the Measurement does not hold onto a reference for the token it was given.

We have various unit tests which should be testing out this scenario, such as: https://github.com/microsoft/iomt-fhir/blob/main/test/Microsoft.Health.Fhir.Ingest.Template.UnitTests/CalculatedFunctionContentTemplateTests.cs#L79

}

tokenClone = null;
}

protected IExpressionEvaluator CreateRequiredExpressionEvaluator(TemplateExpression expression, string expressionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,42 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.IO;
using System.Text;
using EnsureThat;
using Microsoft.Azure.EventHubs;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Health.Fhir.Ingest.Data
{
public class EventDataWithJsonBodyToJTokenConverter : IConverter<EventData, JToken>
{
private const string BodyAttr = "Body";
private const string PropertiesAttr = "Properties";
private const string SystemPropertiesAttr = "SystemProperties";
private readonly JsonSerializer jsonSerializer = JsonSerializer.CreateDefault();

public JToken Convert(EventData input)
{
EnsureArg.IsNotNull(input, nameof(input));
JToken token = new JObject();
JToken body = null;

if (input.Body.Count > 0)
{
using (var stream = new MemoryStream(input.Body.Array))
using (StreamReader sr = new StreamReader(stream, Encoding.UTF8))
using (JsonReader reader = new JsonTextReader(sr))
{
body = JToken.ReadFrom(reader);
}
}

var body = input.Body.Count > 0
? JToken.Parse(Encoding.UTF8.GetString(input.Body.Array, input.Body.Offset, input.Body.Count))
: null;
var data = new { Body = body, input.Properties, input.SystemProperties };
var token = JToken.FromObject(data);
token[BodyAttr] = body;
token[PropertiesAttr] = JToken.FromObject(input.Properties, jsonSerializer);
token[SystemPropertiesAttr] = JToken.FromObject(input.SystemProperties, jsonSerializer);

return token;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -45,7 +46,7 @@ public MeasurementEventNormalizationService(
Data.IConverter<EventData, JToken> converter,
IExceptionTelemetryProcessor exceptionTelemetryProcessor,
int maxParallelism,
int asyncCollectorBatchSize = 200)
int asyncCollectorBatchSize = 25)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any data on the impact of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No hard numbers. I had originally put in 200 as a starting point, though. With the intent on tweaking it as needed. I can revert this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be good for now. If we have enough time to compare just this change with out the others and we see a benefit then we can include.

{
_log = EnsureArg.IsNotNull(log, nameof(log));
_contentTemplate = EnsureArg.IsNotNull(contentTemplate, nameof(contentTemplate));
Expand Down Expand Up @@ -99,7 +100,6 @@ private async Task StartConsumer(ISourceBlock<EventData> producer, IEnumerableAs
var transformingConsumer = new TransformManyBlock<EventData, (string, IMeasurement)>(
async evt =>
{
var createdMeasurements = new List<(string, IMeasurement)>();
try
{
string partitionId = evt.SystemProperties.PartitionKey;
Expand All @@ -115,19 +115,7 @@ private async Task StartConsumer(ISourceBlock<EventData> producer, IEnumerableAs

var token = _converter.Convert(evt);

try
{
foreach (var measurement in _contentTemplate.GetMeasurements(token))
{
measurement.IngestionTimeUtc = evt.SystemProperties.EnqueuedTimeUtc;
createdMeasurements.Add((partitionId, measurement));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't just yield return here?

Copy link
Contributor Author

@rogordon01 rogordon01 Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately a yield statement cannot existing inside of a try block if there is a catch block. That is the original reason why I create the custom IEnumerable.

https://docs.microsoft.com/en-us/archive/msdn-magazine/2017/june/essential-net-custom-iterators-with-yield#yield-statement-requirements

}
}
catch (Exception ex)
{
// Translate all Normalization Mapping exceptions into a common type for easy identification.
throw new NormalizationDataMappingException(ex);
}
return CreateMeasurementIterator(token, partitionId, exceptions, errorConsumer, evt);
}
catch (Exception ex)
{
Expand All @@ -137,7 +125,7 @@ private async Task StartConsumer(ISourceBlock<EventData> producer, IEnumerableAs
}
}

return createdMeasurements;
return Enumerable.Empty<(string, IMeasurement)>();
});

var asyncCollectorConsumer = new ActionBlock<(string, IMeasurement)[]>(
Expand Down Expand Up @@ -201,5 +189,70 @@ private Task<bool> ProcessErrorAsync(Exception ex, EventData data)
var handled = _exceptionTelemetryProcessor.HandleException(ex, _log);
return Task.FromResult(!handled);
}

private IEnumerable<(string, IMeasurement)> CreateMeasurementIterator(JToken token, string partitionId, ConcurrentBag<Exception> exceptions, Func<Exception, EventData, Task<bool>> errorConsumer, EventData evt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we batched all Measurements per DeviceEvent before sending down the processing pipeline. If there was an error with one Measurement we wouldn't send any Measurements for that DeviceEvent.

This change streams each Measurement onto the processing pipeline as soon as it is generated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one consequence of this is we have bad data and spin, we could be now sending more repeat/duplicate messages down stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true if we were to spin. But I had thought that we decided not to spin, and simply discard bad data? Until we could send it over to the EMS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will continue to spin by default though I did add the ability to change the strategy if desired.

{
IEnumerator<Measurement> enumerable = null;
Action<Exception> storeNormalizedException = ex =>
{
var normalizationException = new NormalizationDataMappingException(ex);
if (errorConsumer(normalizationException, evt).ConfigureAwait(false).GetAwaiter().GetResult())
{
exceptions.Add(normalizationException);
}
};

try
{
enumerable = _contentTemplate.GetMeasurements(token).GetEnumerator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be we able to remove the lower while loop and just yield return as we iterate through each element here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also want to look at simplifying this class completely. I don't know if using the TPL adds much value. We can also at switching over to async enumerables https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8

https://docs.microsoft.com/en-us/dotnet/csharp/whats-new/tutorials/generate-consume-asynchronous-stream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about yield statements inside of try/catch blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching this to an AsyncEnumberable could be a good idea. At the very least it would simplify things. I did make use of the pipelines ability to batch, though. But we could see if that could be refactored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other perf PR we are no longer using this class. Thoughts on reverting this change? It will still be used for the Azure Function implementation but we wouldn't be taking the risk of potentially introducing a new bug for that path. The new metric is great, and we can work to port it over to the new implementation.

}
catch (Exception ex)
{
storeNormalizedException(ex);
yield break;
}

(string, IMeasurement) currentValue = (partitionId, null);
var shouldLoop = true;
var stopWatch = new Stopwatch();

while (shouldLoop)
{
try
{
stopWatch.Start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some helper classes that simplify this. We may need to port them from PaaS to OSS.

shouldLoop = enumerable.MoveNext();
stopWatch.Stop();

_log.LogMetric(
IomtMetrics.NormalizedEventGenerationTimeMs(partitionId),
stopWatch.ElapsedMilliseconds);

stopWatch.Reset();

if (shouldLoop)
{
var measurement = enumerable.Current;
measurement.IngestionTimeUtc = evt.SystemProperties.EnqueuedTimeUtc;
currentValue = (partitionId, measurement);
}
else
{
break;
}
}
catch (Exception ex)
{
// Translate all Normalization Mapping exceptions into a common type for easy identification.
storeNormalizedException(ex);
shouldLoop = false;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dustinburson Do we skip processing the remainder of the DeviceEvent now that one Measurement produced an Exception? Or do we simply skip this Measurement, and allow the remaining Measurements to be generated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it we would keep processing but we would need to discuss further.

if (shouldLoop)
{
yield return currentValue;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ private IomtMetricDefinition(string metricName)

public static IomtMetricDefinition NormalizedEvent { get; } = new IomtMetricDefinition(nameof(NormalizedEvent));

public static IomtMetricDefinition NormalizedEventGenerationTimeMs { get; } = new IomtMetricDefinition(nameof(NormalizedEventGenerationTimeMs));

public static IomtMetricDefinition Measurement { get; } = new IomtMetricDefinition(nameof(Measurement));

public static IomtMetricDefinition MeasurementGroup { get; } = new IomtMetricDefinition(nameof(MeasurementGroup));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,16 @@ public static Metric HandledException(string exceptionName, string connectorStag
{
return exceptionName.ToErrorMetric(connectorStage, ErrorType.GeneralError, ErrorSeverity.Critical);
}

/// <summary>
/// The time it takes to generate a Normalized Event.
/// </summary>
/// <param name="partitionId">The partition id of the events being consumed from the event hub partition </param>
public static Metric NormalizedEventGenerationTimeMs(string partitionId = null)
{
return IomtMetricDefinition.NormalizedEventGenerationTimeMs
.CreateBaseMetric(Category.Traffic, ConnectorOperation.Normalization)
.AddDimension(_partitionDimension, partitionId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\lib\Microsoft.Health.Expressions\Microsoft.Health.Expressions.csproj" />
<ProjectReference Include="..\..\src\lib\Microsoft.Health.Fhir.Ingest\Microsoft.Health.Fhir.Ingest.csproj" />
<ProjectReference Include="..\Microsoft.Health.Tests.Common\Microsoft.Health.Tests.Common.csproj" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public async Task GivenMultipleEventsWithOneResultPer_WhenProcessAsync_ThenEachE
_template.GetMeasurements(null).ReturnsForAnyArgs(new[] { Substitute.For<Measurement>() });
var events = Enumerable.Range(0, 10).Select(i => BuildEvent(i)).ToArray();

var srv = new MeasurementEventNormalizationService(_logger, _template, _exceptionTelemetryProcessor);
var srv = new MeasurementEventNormalizationService(_logger, _template, _converter, _exceptionTelemetryProcessor, 3, 25);
await srv.ProcessAsync(events, _consumer);

_template.ReceivedWithAnyArgs(events.Length).GetMeasurements(null);
Expand Down