diff --git a/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs b/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs index 67d625e1..7bf845cf 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest.Template/MeasurementExtractor.cs @@ -126,15 +126,23 @@ protected virtual IEnumerable MatchTypeTokens(JObject token) EnsureArg.IsNotNull(token, nameof(token)); var evaluator = CreateRequiredExpressionEvaluator(Template.TypeMatchExpression, nameof(Template.TypeMatchExpression)); + JObject tokenClone = null; + foreach (var extractedToken in evaluator.SelectTokens(token)) { // Add the extracted data as an element of the original data. // This allows subsequent expressions access to data from the original event data + if (tokenClone == null) + { + tokenClone = new JObject(token); + } - var tokenClone = token.DeepClone() as JObject; tokenClone.Add(MatchedToken, extractedToken); yield return tokenClone; + tokenClone.Remove(MatchedToken); } + + tokenClone = null; } protected IExpressionEvaluator CreateRequiredExpressionEvaluator(TemplateExpression expression, string expressionName) diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Data/EventDataWithJsonBodyToJTokenConverter.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Data/EventDataWithJsonBodyToJTokenConverter.cs index 70c6dfbf..d605a08d 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Data/EventDataWithJsonBodyToJTokenConverter.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Data/EventDataWithJsonBodyToJTokenConverter.cs @@ -3,24 +3,42 @@ // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- +using System.IO; using System.Text; using EnsureThat; using Microsoft.Azure.EventHubs; +using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace Microsoft.Health.Fhir.Ingest.Data { public class EventDataWithJsonBodyToJTokenConverter : IConverter { + private const string BodyAttr = "Body"; + private const string PropertiesAttr = "Properties"; + private const string SystemPropertiesAttr = "SystemProperties"; + private readonly JsonSerializer jsonSerializer = JsonSerializer.CreateDefault(); + public JToken Convert(EventData input) { EnsureArg.IsNotNull(input, nameof(input)); + JToken token = new JObject(); + JToken body = null; + + if (input.Body.Count > 0) + { + using (var stream = new MemoryStream(input.Body.Array)) + using (StreamReader sr = new StreamReader(stream, Encoding.UTF8)) + using (JsonReader reader = new JsonTextReader(sr)) + { + body = JToken.ReadFrom(reader); + } + } - var body = input.Body.Count > 0 - ? JToken.Parse(Encoding.UTF8.GetString(input.Body.Array, input.Body.Offset, input.Body.Count)) - : null; - var data = new { Body = body, input.Properties, input.SystemProperties }; - var token = JToken.FromObject(data); + token[BodyAttr] = body; + token[PropertiesAttr] = JToken.FromObject(input.Properties, jsonSerializer); + token[SystemPropertiesAttr] = JToken.FromObject(input.SystemProperties, jsonSerializer); + return token; } } diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs index 0cafa0f6..8d41d261 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs @@ -6,6 +6,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -45,7 +46,7 @@ public MeasurementEventNormalizationService( Data.IConverter converter, IExceptionTelemetryProcessor exceptionTelemetryProcessor, int maxParallelism, - int asyncCollectorBatchSize = 200) + int asyncCollectorBatchSize = 25) { _log = EnsureArg.IsNotNull(log, nameof(log)); _contentTemplate = EnsureArg.IsNotNull(contentTemplate, nameof(contentTemplate)); @@ -99,7 +100,6 @@ private async Task StartConsumer(ISourceBlock producer, IEnumerableAs var transformingConsumer = new TransformManyBlock( async evt => { - var createdMeasurements = new List<(string, IMeasurement)>(); try { string partitionId = evt.SystemProperties.PartitionKey; @@ -115,19 +115,7 @@ private async Task StartConsumer(ISourceBlock producer, IEnumerableAs var token = _converter.Convert(evt); - try - { - foreach (var measurement in _contentTemplate.GetMeasurements(token)) - { - measurement.IngestionTimeUtc = evt.SystemProperties.EnqueuedTimeUtc; - createdMeasurements.Add((partitionId, measurement)); - } - } - catch (Exception ex) - { - // Translate all Normalization Mapping exceptions into a common type for easy identification. - throw new NormalizationDataMappingException(ex); - } + return CreateMeasurementIterator(token, partitionId, exceptions, errorConsumer, evt); } catch (Exception ex) { @@ -137,7 +125,7 @@ private async Task StartConsumer(ISourceBlock producer, IEnumerableAs } } - return createdMeasurements; + return Enumerable.Empty<(string, IMeasurement)>(); }); var asyncCollectorConsumer = new ActionBlock<(string, IMeasurement)[]>( @@ -201,5 +189,70 @@ private Task ProcessErrorAsync(Exception ex, EventData data) var handled = _exceptionTelemetryProcessor.HandleException(ex, _log); return Task.FromResult(!handled); } + + private IEnumerable<(string, IMeasurement)> CreateMeasurementIterator(JToken token, string partitionId, ConcurrentBag exceptions, Func> errorConsumer, EventData evt) + { + IEnumerator enumerable = null; + Action storeNormalizedException = ex => + { + var normalizationException = new NormalizationDataMappingException(ex); + if (errorConsumer(normalizationException, evt).ConfigureAwait(false).GetAwaiter().GetResult()) + { + exceptions.Add(normalizationException); + } + }; + + try + { + enumerable = _contentTemplate.GetMeasurements(token).GetEnumerator(); + } + catch (Exception ex) + { + storeNormalizedException(ex); + yield break; + } + + (string, IMeasurement) currentValue = (partitionId, null); + var shouldLoop = true; + var stopWatch = new Stopwatch(); + + while (shouldLoop) + { + try + { + stopWatch.Start(); + shouldLoop = enumerable.MoveNext(); + stopWatch.Stop(); + + _log.LogMetric( + IomtMetrics.NormalizedEventGenerationTimeMs(partitionId), + stopWatch.ElapsedMilliseconds); + + stopWatch.Reset(); + + if (shouldLoop) + { + var measurement = enumerable.Current; + measurement.IngestionTimeUtc = evt.SystemProperties.EnqueuedTimeUtc; + currentValue = (partitionId, measurement); + } + else + { + break; + } + } + catch (Exception ex) + { + // Translate all Normalization Mapping exceptions into a common type for easy identification. + storeNormalizedException(ex); + shouldLoop = false; + } + + if (shouldLoop) + { + yield return currentValue; + } + } + } } } \ No newline at end of file diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs index bec58505..6e9e0411 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs @@ -24,6 +24,8 @@ private IomtMetricDefinition(string metricName) public static IomtMetricDefinition NormalizedEvent { get; } = new IomtMetricDefinition(nameof(NormalizedEvent)); + public static IomtMetricDefinition NormalizedEventGenerationTimeMs { get; } = new IomtMetricDefinition(nameof(NormalizedEventGenerationTimeMs)); + public static IomtMetricDefinition Measurement { get; } = new IomtMetricDefinition(nameof(Measurement)); public static IomtMetricDefinition MeasurementGroup { get; } = new IomtMetricDefinition(nameof(MeasurementGroup)); diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs index defc5c62..f180c42e 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs @@ -159,5 +159,16 @@ public static Metric HandledException(string exceptionName, string connectorStag { return exceptionName.ToErrorMetric(connectorStage, ErrorType.GeneralError, ErrorSeverity.Critical); } + + /// + /// The time it takes to generate a Normalized Event. + /// + /// The partition id of the events being consumed from the event hub partition + public static Metric NormalizedEventGenerationTimeMs(string partitionId = null) + { + return IomtMetricDefinition.NormalizedEventGenerationTimeMs + .CreateBaseMetric(Category.Traffic, ConnectorOperation.Normalization) + .AddDimension(_partitionDimension, partitionId); + } } } diff --git a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Microsoft.Health.Fhir.Ingest.UnitTests.csproj b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Microsoft.Health.Fhir.Ingest.UnitTests.csproj index 4b11a28f..4b8014b7 100644 --- a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Microsoft.Health.Fhir.Ingest.UnitTests.csproj +++ b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Microsoft.Health.Fhir.Ingest.UnitTests.csproj @@ -36,6 +36,7 @@ + diff --git a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs index 13f81810..7cf58cca 100644 --- a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs +++ b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs @@ -43,7 +43,7 @@ public async Task GivenMultipleEventsWithOneResultPer_WhenProcessAsync_ThenEachE _template.GetMeasurements(null).ReturnsForAnyArgs(new[] { Substitute.For() }); var events = Enumerable.Range(0, 10).Select(i => BuildEvent(i)).ToArray(); - var srv = new MeasurementEventNormalizationService(_logger, _template, _exceptionTelemetryProcessor); + var srv = new MeasurementEventNormalizationService(_logger, _template, _converter, _exceptionTelemetryProcessor, 3, 25); await srv.ProcessAsync(events, _consumer); _template.ReceivedWithAnyArgs(events.Length).GetMeasurements(null);