Skip to content

Commit

Permalink
Add correlation id as option for grouping measurements (#47)
Browse files Browse the repository at this point in the history
* Add correlation id support to measurements/groups & ObservationGroups/Factory

* Add correlation id to SA queries in collection query project and SA query property of ARM templates
* Add ability to extract correlation id to JsonPathContentTemplate
* Add tests for extraction logic
* Update exception in CorrelationObservationGroup to throw new CorrelationIdNotDefinedException with id not set
* Add CorrelationIdNotDefinedException to telemetry processor as a known non-recoverable error.
* Add unit tests for new observation groups and observation group factories.
* Add missing file header
* Add unit test for FhirTemplate to ensure default value for PeriodInterval hasn't changed.  This is to ensure no backwards breaking changes.
* FIx CorrelationMeasurementObservationGroupFactory not setting ObservationGroup name
* Add additional validations
* Refactor value processors to accept new interface IObservationData that includes data to be created and information about the existing observation.
* Add DateTime extensions for common FHIR operations on DateTime values.
* Add ability to update the period of observation if the merged values our outside the current period.
* Add unit tests for observation period update scenario.
* Fixed CodeValueFhirTemplateProcessor using new period instead of old period when merging data.  Old period will now be used and tests updated.
* Updated SampledDataFhirValueProcessor to use values from existing observation when merging sample data.  Tests updated.
* Update SampledDataFhirValueProcessor to use observation period instead of data period during create to future proof.  Right now they would always be the same.
* Fix issue in SampledDataProcessor were the last value was omitted from the stream if it was on the end boundary.  Never hit before because the hourly and daily time periods would end on the time boundary - 1 ms.
* Update documentation to include the new correlation id feature.
  • Loading branch information
dustinburson authored Jun 18, 2020
1 parent 9f7cf74 commit 4e29aa7
Show file tree
Hide file tree
Showing 51 changed files with 2,117 additions and 711 deletions.
2 changes: 1 addition & 1 deletion deploy/templates/consumption-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
2 changes: 1 addition & 1 deletion deploy/templates/default-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
2 changes: 1 addition & 1 deletion deploy/templates/managed-identity-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
2 changes: 1 addition & 1 deletion deploy/templates/premium-azuredeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@
"name": "Transformation",
"properties": {
"streamingUnits": "[parameters('StreamingUnits')]",
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
"query": "[concat('SELECT \r\n DeviceId [DeviceId], \r\n PatientId [PatientId],\r\n EncounterId [EncounterId],\r\n CorrelationId [CorrelationId],\r\n collect() [Data],\r\n System.Timestamp [WindowTime],\r\n Type [MeasureType],\r\n count(*) [Count]\r\nINTO\r\n [FhirImportOutput]\r\nFROM\r\n [NormalizedData] PARTITION BY PartitionId TIMESTAMP BY OccurrenceTimeUtc\r\nGROUP BY PartitionId, \r\n DeviceId, \r\n PatientId, \r\n EncounterId, \r\n CorrelationId, \r\n Type, \r\n TUMBLINGWINDOW(', parameters('JobWindowUnit'), ', ', parameters('JobWindowMagnitude'), ')')]"
}
},
"functions": [
Expand Down
Loading

0 comments on commit 4e29aa7

Please sign in to comment.