Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add correlation id as option for grouping measurements #47

Merged
merged 13 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/templates/consumption-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
2 changes: 1 addition & 1 deletion deploy/templates/default-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
2 changes: 1 addition & 1 deletion deploy/templates/managed-identity-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
2 changes: 1 addition & 1 deletion deploy/templates/premium-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;

namespace Microsoft.Health.Fhir.Ingest.Data
{
public class CorrelationIdNotDefinedException : Exception
{
public CorrelationIdNotDefinedException(string message)
: base(message)
{
}

public CorrelationIdNotDefinedException(string message, Exception innerException)
: base(message, innerException)
{
}

public CorrelationIdNotDefinedException()
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using EnsureThat;

namespace Microsoft.Health.Fhir.Ingest.Data
{
public class CorrelationMeasurementObservationGroup : SortedObservationGroup
{
private readonly string _correlationId;
private DateTime _start = DateTime.MaxValue;
private DateTime _end = DateTime.MinValue;

public CorrelationMeasurementObservationGroup(string correlationId)
{
if (string.IsNullOrWhiteSpace(correlationId))
{
throw new CorrelationIdNotDefinedException();
}

_correlationId = correlationId;
}

public override (DateTime Start, DateTime End) Boundary => (_start, _end);

public override void AddMeasurement(IMeasurement measurement)
{
EnsureArg.IsNotNull(measurement, nameof(measurement));

base.AddMeasurement(measurement);

if (measurement.OccurrenceTimeUtc < _start)
{
_start = measurement.OccurrenceTimeUtc;
}

if (measurement.OccurrenceTimeUtc > _end)
{
_end = measurement.OccurrenceTimeUtc;
}
}

public override string GetIdSegment()
{
return _correlationId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Collections.Generic;
using EnsureThat;

namespace Microsoft.Health.Fhir.Ingest.Data
{
public class CorrelationMeasurementObservationGroupFactory : IObservationGroupFactory<IMeasurementGroup>
{
public IEnumerable<IObservationGroup> Build(IMeasurementGroup input)
{
EnsureArg.IsNotNull(input, nameof(input));
EnsureArg.IsNotNull(input.MeasureType, nameof(input.MeasureType));
EnsureArg.IsNotNull(input.Data, nameof(input.Data));

var observationGroup = new CorrelationMeasurementObservationGroup(input.CorrelationId)
{
Name = input.MeasureType,
};

foreach (var m in input.Data)
{
observationGroup.AddMeasurement(m);
}

yield return observationGroup;
}
}
}
2 changes: 2 additions & 0 deletions src/lib/Microsoft.Health.Fhir.Ingest/Data/IMeasurement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface IMeasurement

string EncounterId { get; }

string CorrelationId { get; }

IEnumerable<MeasurementProperty> Properties { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public interface IMeasurementGroup

string EncounterId { get; }

string CorrelationId { get; }

IEnumerable<IMeasurement> Data { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ public interface IObservationGroup
void AddMeasurement(IMeasurement measurement);

IDictionary<string, IEnumerable<(DateTime Time, string Value)>> GetValues();

string GetIdSegment();
}
}
3 changes: 2 additions & 1 deletion src/lib/Microsoft.Health.Fhir.Ingest/Data/Measurement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ public Measurement()

public string EncounterId { get; set; }

public string CorrelationId { get; set; }

#pragma warning disable CA2227
public IList<MeasurementProperty> Properties { get; set; }

#pragma warning restore CA2227

IEnumerable<MeasurementProperty> IMeasurement.Properties => Properties;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/Microsoft.Health.Fhir.Ingest/Data/MeasurementGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class MeasurementGroup : IMeasurementGroup

public string EncounterId { get; set; }

public string CorrelationId { get; set; }

public DateTime WindowTime { get; set; }

public string MeasureType { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,118 +5,34 @@

using System;
using System.Collections.Generic;
using System.Linq;
using EnsureThat;
using Microsoft.Health.Fhir.Ingest.Template;

namespace Microsoft.Health.Fhir.Ingest.Data
{
public class MeasurementObservationGroupFactory : IObservationGroupFactory<IMeasurementGroup>
{
private static readonly IDictionary<ObservationPeriodInterval, Func<DateTime, (DateTime Start, DateTime End)>> BoundaryFunctions = new Dictionary<ObservationPeriodInterval, Func<DateTime, (DateTime Start, DateTime End)>>
{
{ ObservationPeriodInterval.Single, GetSingleBoundary },
{ ObservationPeriodInterval.Hourly, GetHourlyBoundary },
{ ObservationPeriodInterval.Daily, GetDailyBoundary },
};

private readonly Func<DateTime, (DateTime Start, DateTime End)> _boundaryFunction;
private readonly IObservationGroupFactory<IMeasurementGroup> _internalFactory;

public MeasurementObservationGroupFactory(ObservationPeriodInterval period)
{
_boundaryFunction = BoundaryFunctions[period];
}

public IEnumerable<IObservationGroup> Build(IMeasurementGroup input)
{
EnsureArg.IsNotNull(input, nameof(input));

var lookup = new Dictionary<DateTime, IObservationGroup>();
foreach (var m in input.Data)
switch (period)
{
var boundary = GetBoundaryKey(m);
if (lookup.TryGetValue(boundary.Start, out var grp))
{
grp.AddMeasurement(m);
}
else
{
var newGrp = CreateObservationGroup(input, boundary);
newGrp.AddMeasurement(m);
lookup.Add(boundary.Start, newGrp);
}
case ObservationPeriodInterval.CorrelationId:
_internalFactory = new CorrelationMeasurementObservationGroupFactory();
break;
case ObservationPeriodInterval.Single:
case ObservationPeriodInterval.Hourly:
case ObservationPeriodInterval.Daily:
_internalFactory = new TimePeriodMeasurementObservationGroupFactory(period);
break;
default:
throw new NotSupportedException($"ObservationPeriodInterval {period} is not supported.");
}

return lookup.Values;
}

public virtual (DateTime Start, DateTime End) GetBoundaryKey(IMeasurement measurement)
{
EnsureArg.IsNotNull(measurement);

return _boundaryFunction(measurement.OccurrenceTimeUtc.ToUniversalTime());
}

public virtual IObservationGroup CreateObservationGroup(IMeasurementGroup group, (DateTime Start, DateTime End) boundary)
{
EnsureArg.IsNotNull(group, nameof(group));

return new MeasurementObservationGroup
{
Name = group.MeasureType,
Boundary = boundary,
};
}

private static (DateTime Start, DateTime End) GetSingleBoundary(DateTime utcDateTime)
{
return (utcDateTime, utcDateTime);
}

private static (DateTime Start, DateTime End) GetHourlyBoundary(DateTime utcDateTime)
{
var start = utcDateTime.Date.AddHours(utcDateTime.Hour);
var end = start.AddHours(1).AddTicks(-1);

return (start, end);
}

private static (DateTime Start, DateTime End) GetDailyBoundary(DateTime utcDateTime)
{
var start = utcDateTime.Date;
var end = start.AddDays(1).AddTicks(-1);

return (start, end);
}

private class MeasurementObservationGroup : IObservationGroup, IComparer<(DateTime Time, string Value)>
public IEnumerable<IObservationGroup> Build(IMeasurementGroup input)
{
private readonly Dictionary<string, SortedSet<(DateTime Time, string Value)>> _propertyTimeValues = new Dictionary<string, SortedSet<(DateTime Time, string Value)>>();

public string Name { get; set; }

public (DateTime Start, DateTime End) Boundary { get; set; }

public void AddMeasurement(IMeasurement measurement)
{
foreach (var mp in measurement.Properties)
{
if (!_propertyTimeValues.TryGetValue(mp.Name, out SortedSet<(DateTime Time, string Value)> values))
{
values = new SortedSet<(DateTime Time, string Value)>(this);
_propertyTimeValues.Add(mp.Name, values);
}

values.Add((measurement.OccurrenceTimeUtc, mp.Value));
}
}

public int Compare((DateTime Time, string Value) x, (DateTime Time, string Value) y)
{
return DateTime.Compare(x.Time, y.Time);
}

public IDictionary<string, IEnumerable<(DateTime Time, string Value)>> GetValues() => _propertyTimeValues.ToDictionary(m => m.Key, m => (IEnumerable<(DateTime Time, string Value)>)m.Value);
return _internalFactory.Build(input);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using EnsureThat;

namespace Microsoft.Health.Fhir.Ingest.Data
{
public abstract class SortedObservationGroup : IObservationGroup, IComparer<(DateTime Time, string Value)>
{
private readonly Dictionary<string, SortedSet<(DateTime Time, string Value)>> _propertyTimeValues = new Dictionary<string, SortedSet<(DateTime Time, string Value)>>();

public virtual string Name { get; set; }

public abstract (DateTime Start, DateTime End) Boundary { get; }

protected IDictionary<string, SortedSet<(DateTime Time, string Value)>> PropertyTimeValues => _propertyTimeValues;

public virtual void AddMeasurement(IMeasurement measurement)
{
EnsureArg.IsNotNull(measurement);

foreach (var mp in measurement.Properties)
{
if (!_propertyTimeValues.TryGetValue(mp.Name, out SortedSet<(DateTime Time, string Value)> values))
{
values = new SortedSet<(DateTime Time, string Value)>(this);
_propertyTimeValues.Add(mp.Name, values);
}

values.Add((measurement.OccurrenceTimeUtc, mp.Value));
}
}

public abstract string GetIdSegment();

public virtual IDictionary<string, IEnumerable<(DateTime Time, string Value)>> GetValues() => _propertyTimeValues.ToDictionary(m => m.Key, m => (IEnumerable<(DateTime Time, string Value)>)m.Value);

public int Compare((DateTime Time, string Value) x, (DateTime Time, string Value) y)
{
return DateTime.Compare(x.Time, y.Time);
}
}
}
Loading