diff --git a/Microsoft.Health.Fhir.Ingest.sln b/Microsoft.Health.Fhir.Ingest.sln index 18b41eeb..e16e95b7 100644 --- a/Microsoft.Health.Fhir.Ingest.sln +++ b/Microsoft.Health.Fhir.Ingest.sln @@ -73,9 +73,17 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "healthkitOnFhir", "healthki EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Template", "src\lib\Microsoft.Health.Fhir.Ingest.Template\Microsoft.Health.Fhir.Ingest.Template.csproj", "{85D653A7-0E63-4751-8904-728156807A14}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Ingest.Schema", "src\lib\Microsoft.Health.Fhir.Ingest.Schema\Microsoft.Health.Fhir.Ingest.Schema.csproj", "{A85AB6BC-698C-460F-81D4-9D1D2BD14B71}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Schema", "src\lib\Microsoft.Health.Fhir.Ingest.Schema\Microsoft.Health.Fhir.Ingest.Schema.csproj", "{A85AB6BC-698C-460F-81D4-9D1D2BD14B71}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Ingest.Template.UnitTests", "test\Microsoft.Health.Fhir.Ingest.Template.UnitTests\Microsoft.Health.Fhir.Ingest.Template.UnitTests.csproj", "{EE072537-807D-4FE2-BFEB-424B64DCD7F9}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Template.UnitTests", "test\Microsoft.Health.Fhir.Ingest.Template.UnitTests\Microsoft.Health.Fhir.Ingest.Template.UnitTests.csproj", "{EE072537-807D-4FE2-BFEB-424B64DCD7F9}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Events", "src\lib\Microsoft.Health.Events\Microsoft.Health.Events.csproj", "{22275DE3-859D-40F0-9547-7711568164C0}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "console", "console", "{1EF3584A-C437-4B45-8BF8-1597D5A8DBC7}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Console", "src\console\Microsoft.Health.Fhir.Ingest.Console.csproj", "{927BC214-ABD9-4A1B-9F7C-75973513D141}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Logging", "src\lib\Microsoft.Health.Logger\Microsoft.Health.Logging.csproj", "{05123BAE-E96E-4C7E-95CB-C616DF940F17}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -147,6 +155,18 @@ Global {EE072537-807D-4FE2-BFEB-424B64DCD7F9}.Debug|Any CPU.Build.0 = Debug|Any CPU {EE072537-807D-4FE2-BFEB-424B64DCD7F9}.Release|Any CPU.ActiveCfg = Release|Any CPU {EE072537-807D-4FE2-BFEB-424B64DCD7F9}.Release|Any CPU.Build.0 = Release|Any CPU + {22275DE3-859D-40F0-9547-7711568164C0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {22275DE3-859D-40F0-9547-7711568164C0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {22275DE3-859D-40F0-9547-7711568164C0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {22275DE3-859D-40F0-9547-7711568164C0}.Release|Any CPU.Build.0 = Release|Any CPU + {927BC214-ABD9-4A1B-9F7C-75973513D141}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {927BC214-ABD9-4A1B-9F7C-75973513D141}.Debug|Any CPU.Build.0 = Debug|Any CPU + {927BC214-ABD9-4A1B-9F7C-75973513D141}.Release|Any CPU.ActiveCfg = Release|Any CPU + {927BC214-ABD9-4A1B-9F7C-75973513D141}.Release|Any CPU.Build.0 = Release|Any CPU + {05123BAE-E96E-4C7E-95CB-C616DF940F17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {05123BAE-E96E-4C7E-95CB-C616DF940F17}.Debug|Any CPU.Build.0 = Debug|Any CPU + {05123BAE-E96E-4C7E-95CB-C616DF940F17}.Release|Any CPU.ActiveCfg = Release|Any CPU + {05123BAE-E96E-4C7E-95CB-C616DF940F17}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -172,6 +192,9 @@ Global {85D653A7-0E63-4751-8904-728156807A14} = {513D67B4-80E1-476D-955F-E7E7C79D144A} {A85AB6BC-698C-460F-81D4-9D1D2BD14B71} = {513D67B4-80E1-476D-955F-E7E7C79D144A} {EE072537-807D-4FE2-BFEB-424B64DCD7F9} = {FAF8B402-892E-4EA2-B4CF-69B0C70BA762} + {22275DE3-859D-40F0-9547-7711568164C0} = {513D67B4-80E1-476D-955F-E7E7C79D144A} + {927BC214-ABD9-4A1B-9F7C-75973513D141} = {1EF3584A-C437-4B45-8BF8-1597D5A8DBC7} + {05123BAE-E96E-4C7E-95CB-C616DF940F17} = {513D67B4-80E1-476D-955F-E7E7C79D144A} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A358924D-F948-4AE8-8CD0-A0F56225CE0C} diff --git a/deploy/scripts/Create-IomtWebJobsEnvironment.ps1 b/deploy/scripts/Create-IomtWebJobsEnvironment.ps1 new file mode 100644 index 00000000..e0bb19b2 --- /dev/null +++ b/deploy/scripts/Create-IomtWebJobsEnvironment.ps1 @@ -0,0 +1,113 @@ +<# +.SYNOPSIS +Creates a new IoMT FHIR Connector for Azure without using Stream Analytics +.DESCRIPTION +#> +param +( + [Parameter(Mandatory = $true)] + [string]$ResourceGroup, + + [Parameter(Mandatory = $true)] + [ValidateNotNullOrEmpty()] + [ValidateLength(5,12)] + [ValidateScript({ + if ("$_" -cmatch "(^([a-z]|\d)+$)") { + return $true + } + else { + throw "Environment name must be lowercase and numbers" + return $false + } + })] + [string]$EnvironmentName, + + [Parameter(Mandatory = $false)] + [ValidateSet('South Africa North', 'South Africa West', 'East Asia', 'Southeast Asia', 'Australia Central', 'Australia Central 2', 'Australia East', 'Australia Southeast', 'Brazil South', 'Brazil Southeast', 'Canada Central', 'Canada East', 'China East', 'China East 2', 'China North', 'China North 2', 'North Europe', 'West Europe', 'France Central', 'France South', 'Germany Central', 'Germany Northeast', 'Germany West Central', 'Central India', 'South India', 'West India', 'Japan East', 'Japan West', 'Korea Central', 'Korea South', 'Norway East', 'Switzerland North', 'Switzerland West', 'UAE Central', 'UAE North', 'UK West', 'UK South', 'Central US', 'East US', 'East US 2', 'North Central US', 'South Central US', 'West Central US', 'West US', 'West US 2')] + [string]$EnvironmentLocation = "North Central US", + [Parameter(Mandatory = $false)] + [ValidateSet('R4')] + [string]$FhirVersion = "R4", + + [Parameter(Mandatory = $false)] + [string]$SourceRepository = "https://github.com/microsoft/iomt-fhir", + + [Parameter(Mandatory = $false)] + [string]$SourceRevision = "master", + + [Parameter(Mandatory = $true)] + [string]$FhirServiceUrl, + + [Parameter(Mandatory = $true)] + [string]$FhirServiceAuthority, + + [Parameter(Mandatory = $true)] + [string]$FhirServiceClientId, + + [Parameter(Mandatory = $true)] + [string]$FhirServiceSecret, + + [Parameter(Mandatory = $false)] + [string]$EnvironmentDeploy = $true +) + +Function BuildPackage() { + try { + Push-Location $currentPath + cd ../../src/console/ + dotnet restore + dotnet build --output $buildPath /p:DeployOnBuild=true /p:DeployTarget=Package + } finally { + Pop-Location + } +} + +Function Deploy-WebJobs($DeviceDataWebJobName, $NormalizedDataWebJobName) { + try { + $tempPath = "$currentPath\Temp" + $webAppName = $EnvironmentName + $webJobType = "Continuous" + + Clear-Path -WebJobName $DeviceDataWebJobName + Clear-Path -WebJobName $NormalizedDataWebJobName + + $DeviceWebJobPath = "$tempPath\App_Data\jobs\$webJobType\$DeviceDataWebJobName" + $NormalizedWebJobPath = "$tempPath\App_Data\jobs\$webJobType\$NormalizedDataWebJobName" + Copy-Item "$buildPath\*" -Destination $DeviceWebJobPath -Recurse + Copy-Item "$buildPath\*" -Destination $NormalizedWebJobPath -Recurse + + Compress-Archive -Path "$tempPath\*" -DestinationPath "$currentPath\iomtwebjobs.zip" -Force + + Publish-AzWebApp -ArchivePath "$currentPath\iomtwebjobs.zip" -ResourceGroupName $ResourceGroup -Name $webAppName + } finally { + Pop-Location + } +} + +Function Clear-Path($WebJobName) { + $WebJobPath = "$tempPath\App_Data\jobs\$webJobType\$WebJobName" + Get-ChildItem -Path $WebJobPath -Recurse | Remove-Item -Force -Recurse + if( -Not (Test-Path -Path $WebJobPath ) ) + { + New-Item $WebJobPath -ItemType Directory + } +} + +Set-StrictMode -Version Latest +$ErrorActionPreference = "Stop" + +# deploy event hubs, app service, key vaults, storage +if ($EnvironmentDeploy -eq $true) { + Write-Host "Deploying environment resources..." + $webjobTemplate = "..\templates\default-azuredeploy-webjobs.json" + New-AzResourceGroupDeployment -TemplateFile $webjobTemplate -ResourceGroupName $ResourceGroup -ServiceName $EnvironmentName -FhirServiceUrl $fhirServiceUrl -FhirServiceAuthority $FhirServiceAuthority -FhirServiceResource $fhirServiceUrl -FhirServiceClientId $FhirServiceClientId -FhirServiceClientSecret (ConvertTo-SecureString -String $FhirServiceSecret -AsPlainText -Force) -RepositoryUrl $SourceRepository -RepositoryBranch $SourceRevision -ResourceLocation $EnvironmentLocation +} + +# deploy the stream analytics replacement webjobs +Write-Host "Deploying WebJobs..." + +$currentPath = (Get-Location).Path +$buildPath = "$currentPath\OSS_Deployment" +BuildPackage +Deploy-WebJobs -DeviceDataWebJobName "devicedata" -NormalizedDataWebJobName "normalizeddata" + diff --git a/deploy/templates/default-azuredeploy-webjobs.json b/deploy/templates/default-azuredeploy-webjobs.json new file mode 100644 index 00000000..19b11e10 --- /dev/null +++ b/deploy/templates/default-azuredeploy-webjobs.json @@ -0,0 +1,746 @@ +{ + "$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "ServiceName": { + "type": "string", + "minLength": 3, + "maxLength": 20, + "metadata": { + "description": "Name for the service(s) being deployed. Name will applied to all relevant services being created." + } + }, + "RepositoryUrl": { + "type": "string", + "defaultValue": "https://github.com/Microsoft/iomt-fhir", + "metadata": { + "description": "Repository to pull source code from. If blank, source code will not be deployed." + } + }, + "RepositoryBranch": { + "type": "string", + "defaultValue": "master", + "metadata": { + "description": "Source code branch to deploy." + } + }, + "JobWindowUnit": { + "type": "string", + "allowedValues": [ + "SECOND" + ], + "metadata": { + "description": "The time period to collect events before sending them to the FHIR server." + }, + "defaultValue": "SECOND" + }, + "JobWindowMagnitude": { + "type": "int", + "minValue": 1, + "maxValue": 86400, + "metadata": { + "description": "The magnitude of time period to collect events before sending them to the FHIR server." + }, + "defaultValue": 60 + }, + "JobMaxEvents": { + "type": "int", + "minValue": 1, + "maxValue": 1000, + "metadata": { + "description": "The maximum number of events to collect before sending them to the FHIR server." + }, + "defaultValue": 500 + }, + "ThroughputUnits": { + "type": "int", + "minValue": 1, + "maxValue": 20, + "metadata": { + "description": "The throughput units reserved for the Event Hubs created." + }, + "defaultValue": 1 + }, + "AppServicePlanSku": { + "type": "string", + "allowedValues": [ + "F1", + "D1", + "B1", + "B2", + "B3", + "S1", + "S2", + "S3", + "P1", + "P2", + "P3", + "P4" + ], + "defaultValue": "S1", + "metadata": { + "description": "The app service plan tier to use for hosting the required Azure Functions." + } + }, + "ResourceLocation": { + "type": "string", + "allowedValues": [ + "South Africa North", + "South Africa West", + "East Asia", + "Southeast Asia", + "Australia Central", + "Australia Central 2", + "Australia East", + "Australia Southeast", + "Brazil South", + "Brazil Southeast", + "Canada Central", + "Canada East", + "China East", + "China East 2", + "China North", + "China North 2", + "North Europe", + "West Europe", + "France Central", + "France South", + "Germany Central", + "Germany Northeast", + "Germany West Central", + "Central India", + "South India", + "West India", + "Japan East", + "Japan West", + "Korea Central", + "Korea South", + "Norway East", + "Switzerland North", + "Switzerland West", + "UAE Central", + "UAE North", + "UK West", + "UK South", + "Central US", + "East US", + "East US 2", + "North Central US", + "South Central US", + "West Central US", + "West US", + "West US 2" + ], + "metadata": { + "description": "The location of the deployed resources." + } + }, + "FhirServiceUrl": { + "type": "string", + "metadata": { + "description": "Url of the FHIR server that IoMT will be written to." + } + }, + "FhirServiceAuthority": { + "type": "string", + "metadata": { + "description": "Authority of the FHIR to retrieve a token against." + } + }, + "FhirServiceResource": { + "type": "string", + "metadata": { + "description": "Resource/Audience representing the FHIR server on the provided authority." + } + }, + "FhirServiceClientId": { + "type": "string", + "metadata": { + "description": "Client Id to run services as for access to the FHIR server." + } + }, + "FhirServiceClientSecret": { + "type": "securestring", + "metadata": { + "description": "Client secret of the application for accessing a token." + } + }, + "FhirVersion": { + "type": "string", + "defaultValue": "R4", + "metadata": { + "description": "FHIR Version that the FHIR Server supports" + } + }, + "ResourceIdentityResolutionType": { + "type": "string", + "allowedValues": [ + "Lookup", + "Create", + "LookupWithEncounter" + ], + "defaultValue": "Lookup", + "metadata": { + "description": "Configures how patient, device, and other FHIR resource identities are resolved from the ingested data stream." + } + }, + "DefaultDeviceIdentifierSystem": { + "type": "string", + "defaultValue": "", + "metadata": { + "description": "Default system to use when searching for device identities. If empty system is not used in the search." + } + } + }, + "variables": { + "asa_job_name": "[parameters('ServiceName')]", + "eventhub_namespace_name": "[parameters('ServiceName')]", + "normalizeddata_eventhub_name": "[concat(variables('eventhub_namespace_name'), '/normalizeddata')]", + "devicedata_eventhub_name": "[concat(variables('eventhub_namespace_name'), '/devicedata')]", + "storage_account_name": "[parameters('ServiceName')]", + "app_plan_name": "[concat(parameters('ServiceName'), 'plan')]", + "app_service_name": "[parameters('ServiceName')]", + "app_insights_name": "[parameters('ServiceName')]", + "key_vault_name": "[parameters('ServiceName')]", + "app_service_resource_id": "[resourceId('Microsoft.Web/sites', variables('app_service_name'))]", + "deploy_source_code": "[and(not(empty(parameters('repositoryUrl'))),not(empty(parameters('repositoryBranch'))))]", + "sender_role": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', '2b629674-e913-4c01-ae53-ef4638d8f975')]", + "receiver_role": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde')]" + }, + "resources": [ + { + "type": "Microsoft.EventHub/namespaces", + "apiVersion": "2017-04-01", + "name": "[variables('eventhub_namespace_name')]", + "location": "[parameters('ResourceLocation')]", + "tags": { + "IomtFhirConnector": "[parameters('ResourceIdentityResolutionType')]" + }, + "sku": { + "name": "Standard", + "tier": "Standard", + "capacity": "[parameters('ThroughputUnits')]" + }, + "properties": { + "zoneRedundant": true, + "isAutoInflateEnabled": false, + "maximumThroughputUnits": 0, + "kafkaEnabled": false + } + }, + { + "type": "Microsoft.EventHub/namespaces/AuthorizationRules", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/RootManageSharedAccessKey')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'devicedata')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'normalizeddata')]" + ], + "properties": { + "rights": [ + "Listen", + "Manage", + "Send" + ] + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/devicedata')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]" + ], + "properties": { + "messageRetentionInDays": 1, + "partitionCount": 32, + "status": "Active" + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/normalizeddata')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]" + ], + "properties": { + "messageRetentionInDays": 1, + "partitionCount": 32, + "status": "Active" + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs/authorizationRules", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/devicedata/reader')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'devicedata')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'normalizeddata')]", + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/AuthorizationRules', variables('eventhub_namespace_name'), 'RootManageSharedAccessKey')]" + ], + "properties": { + "rights": [ + "Listen" + ] + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs/authorizationRules", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/devicedata/writer')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'devicedata')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'normalizeddata')]", + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/AuthorizationRules', variables('eventhub_namespace_name'), 'RootManageSharedAccessKey')]" + ], + "properties": { + "rights": [ + "Send" + ] + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs/authorizationRules", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/normalizeddata/reader')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'devicedata')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'normalizeddata')]", + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/AuthorizationRules', variables('eventhub_namespace_name'), 'RootManageSharedAccessKey')]" + ], + "properties": { + "rights": [ + "Listen" + ] + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs/authorizationRules", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/normalizeddata/writer')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'devicedata')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'normalizeddata')]", + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/AuthorizationRules', variables('eventhub_namespace_name'), 'RootManageSharedAccessKey')]" + ], + "properties": { + "rights": [ + "Send", + "Listen" + ] + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs/consumergroups", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/devicedata/$Default')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'devicedata')]", + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'devicedata', 'reader')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'devicedata', 'writer')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'normalizeddata', 'reader')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'normalizeddata', 'writer')]" + ], + "properties": { + } + }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs/consumergroups", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventhub_namespace_name'), '/normalizeddata/$Default')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventhub_namespace_name'), 'normalizeddata')]", + "[resourceId('Microsoft.EventHub/namespaces', variables('eventhub_namespace_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'devicedata', 'reader')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'devicedata', 'writer')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'normalizeddata', 'reader')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'normalizeddata', 'writer')]" + ], + "properties": { + } + }, + { + "type": "Microsoft.Storage/storageAccounts", + "apiVersion": "2019-04-01", + "name": "[variables('storage_account_name')]", + "location": "[parameters('ResourceLocation')]", + "tags": { + "IomtFhirConnector": "[parameters('ResourceIdentityResolutionType')]" + }, + "sku": { + "name": "Standard_RAGRS", + "tier": "Standard" + }, + "kind": "StorageV2", + "properties": { + "networkAcls": { + "bypass": "AzureServices", + "virtualNetworkRules": [ + ], + "ipRules": [ + ], + "defaultAction": "Allow" + }, + "supportsHttpsTrafficOnly": true, + "encryption": { + "services": { + "file": { + "enabled": true + }, + "blob": { + "enabled": true + } + }, + "keySource": "Microsoft.Storage" + }, + "accessTier": "Hot" + } + }, + { + "type": "Microsoft.Storage/storageAccounts/blobServices", + "apiVersion": "2019-04-01", + "name": "[concat(variables('storage_account_name'), '/default')]", + "dependsOn": [ + "[resourceId('Microsoft.Storage/storageAccounts', variables('storage_account_name'))]" + ], + "properties": { + "cors": { + "corsRules": [ + ] + }, + "deleteRetentionPolicy": { + "enabled": false + } + } + }, + { + "type": "Microsoft.Storage/storageAccounts/blobServices/containers", + "apiVersion": "2019-04-01", + "name": "[concat(variables('storage_account_name'), '/default/template')]", + "dependsOn": [ + "[resourceId('Microsoft.Storage/storageAccounts/blobServices', variables('storage_account_name'), 'default')]", + "[resourceId('Microsoft.Storage/storageAccounts', variables('storage_account_name'))]" + ], + "properties": { + "publicAccess": "None" + } + }, + { + "type": "Microsoft.Storage/storageAccounts/blobServices/containers", + "apiVersion": "2019-04-01", + "name": "[concat(variables('storage_account_name'), '/default/checkpoint')]", + "dependsOn": [ + "[resourceId('Microsoft.Storage/storageAccounts/blobServices', variables('storage_account_name'), 'default')]", + "[resourceId('Microsoft.Storage/storageAccounts', variables('storage_account_name'))]" + ], + "properties": { + "publicAccess": "None" + } + }, + { + "type": "Microsoft.Web/serverfarms", + "apiVersion": "2016-09-01", + "name": "[variables('app_plan_name')]", + "location": "[parameters('ResourceLocation')]", + "tags": { + "IomtFhirConnector": "[variables('app_plan_name')]", + "IomtFhirVersion": "[parameters('FhirVersion')]" + }, + "sku": { + "name": "[parameters('AppServicePlanSku')]" + }, + "kind": "app", + "properties": { + "name": "[variables('app_plan_name')]", + "perSiteScaling": false, + "reserved": false, + "targetWorkerCount": 0, + "targetWorkerSizeId": 0 + } + }, + { + "type": "Microsoft.Web/sites", + "apiVersion": "2018-11-01", + "name": "[variables('app_service_name')]", + "location": "[parameters('ResourceLocation')]", + "kind": "app", + "identity": { + "type": "SystemAssigned" + }, + "properties": { + "enabled": true, + "hostNameSslStates": [ + { + "name": "[concat(variables('app_service_name'), '.azurewebsites.net')]", + "sslState": "Disabled", + "hostType": "Standard" + }, + { + "name": "[concat(variables('app_service_name'), '.scm.azurewebsites.net')]", + "sslState": "Disabled", + "hostType": "Repository" + } + ], + "serverFarmId": "[resourceId('Microsoft.Web/serverfarms', variables('app_plan_name'))]", + "reserved": false, + "isXenon": false, + "hyperV": false, + "siteConfig": { + "alwaysOn": true + }, + "scmSiteAlsoStopped": false, + "clientAffinityEnabled": true, + "clientCertEnabled": false, + "hostNamesDisabled": false, + "containerSize": 0, + "dailyMemoryTimeQuota": 0, + "httpsOnly": false, + "redundancyMode": "None" + }, + "resources": [ + { + "name": "appsettings", + "dependsOn": [ + "[variables('app_service_resource_id')]", + "[resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-url')]", + "[resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-authority')]", + "[resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-resource')]", + "[resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-clientid')]", + "[resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-clientsecret')]" + ], + "type": "config", + "apiVersion": "2016-08-01", + "properties": { + "EventBatching:FlushTimespan": "[parameters('JobWindowMagnitude')]", + "EventBatching:MaxEvents": "[parameters('JobMaxEvents')]", + "Storage:BlobStorageConnectionString": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'blob-storage-cs'), '2018-02-14').secretUriWithVersion, ')')]", + "Storage:BlobContainerName": "checkpoint", + "Storage:BlobPrefix": "", + "Storage:CheckpointBatchCount": 5, + "TemplateStorage:BlobStorageConnectionString": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'blob-storage-cs'), '2018-02-14').secretUriWithVersion, ')')]", + "TemplateStorage:BlobContainerName": "template", + "InputEventHub": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'device-input-connection'), '2018-02-14').secretUriWithVersion, ')')]", + "OutputEventHub": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'device-output-connection'), '2018-02-14').secretUriWithVersion, ')')]", + "FhirVersion": "[parameters('FhirVersion')]", + "FhirService:Url": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-url'), '2018-02-14').secretUriWithVersion, ')')]", + "FhirService:Authority": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-authority'), '2018-02-14').secretUriWithVersion, ')')]", + "FhirService:Resource": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-resource'), '2018-02-14').secretUriWithVersion, ')')]", + "FhirService:ClientId": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-clientid'), '2018-02-14').secretUriWithVersion, ')')]", + "FhirService:ClientSecret": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-clientsecret'), '2018-02-14').secretUriWithVersion, ')')]", + "APPINSIGHTS_INSTRUMENTATIONKEY": "[reference(concat('Microsoft.Insights/components/', variables('app_insights_name'))).InstrumentationKey]", + "Template:DeviceContent": "devicecontent.json", + "Template:FhirMapping": "fhirmapping.json", + "ResourceIdentity:ResourceIdentityResolutionType": "[parameters('ResourceIdentityResolutionType')]", + "ResourceIdentity:DefaultDeviceIdentifierSystem": "[parameters('DefaultDeviceIdentifierSystem')]" + } + } + ] + }, + { + "type": "microsoft.insights/components", + "apiVersion": "2015-05-01", + "name": "[variables('app_insights_name')]", + "location": "[parameters('ResourceLocation')]", + "tags": { + "IomtFhirConnector": "[parameters('ResourceIdentityResolutionType')]" + }, + "kind": "web", + "properties": { + "Application_Type": "web", + "Flow_Type": "Redfield", + "Request_Source": "IbizaAIExtension" + } + }, + { + "type": "Microsoft.KeyVault/vaults", + "apiVersion": "2016-10-01", + "name": "[variables('key_vault_name')]", + "location": "[parameters('ResourceLocation')]", + "tags": { + "IomtFhirConnector": "[parameters('ResourceIdentityResolutionType')]" + }, + "dependsOn": [ + "[variables('app_service_resource_id')]" + ], + "properties": { + "sku": { + "family": "A", + "name": "Standard" + }, + "tenantId": "[subscription().tenantId]", + "accessPolicies": [ + { + "tenantId": "[reference(variables('app_service_resource_id'), '2015-08-01', 'Full').Identity.tenantId]", + "objectId": "[reference(variables('app_service_resource_id'), '2015-08-01', 'Full').Identity.principalId]", + "permissions": { + "keys": [ + ], + "secrets": [ + "Get", + "List" + ], + "certificates": [ + ] + } + } + ], + "enabledForDeployment": false, + "enabledForDiskEncryption": false, + "enabledForTemplateDeployment": false + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('storage_account_name'), '/blob-storage-cs')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]", + "[resourceId('Microsoft.Storage/storageAccounts/blobServices', variables('storage_account_name'), 'default')]" + ], + "properties": { + "contentType": "text/plain", + "value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storage_account_name'), ';AccountKey=', listkeys(resourceId('Microsoft.Storage/storageAccounts', variables('storage_account_name')), '2019-04-01').keys[0].value)]", + "attributes": { + "enabled": true + } + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('key_vault_name'), '/fhirserver-authority')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]" + ], + "properties": { + "contentType": "text/plain", + "value": "[parameters('FhirServiceAuthority')]", + "attributes": { + "enabled": true + } + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('key_vault_name'), '/fhirserver-clientid')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]" + ], + "properties": { + "contentType": "text/plain", + "value": "[parameters('FhirServiceClientId')]", + "attributes": { + "enabled": true + } + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('key_vault_name'), '/fhirserver-clientsecret')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]" + ], + "properties": { + "contentType": "text/plain", + "value": "[parameters('FhirServiceClientSecret')]", + "attributes": { + "enabled": true + } + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('key_vault_name'), '/fhirserver-resource')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]" + ], + "properties": { + "contentType": "text/plain", + "value": "[parameters('FhirServiceResource')]", + "attributes": { + "enabled": true + } + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('key_vault_name'), '/fhirserver-url')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]" + ], + "properties": { + "contentType": "text/plain", + "value": "[parameters('FhirServiceUrl')]", + "attributes": { + "enabled": true + } + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('key_vault_name'), '/device-input-connection')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'devicedata', 'reader')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/consumergroups', variables('eventhub_namespace_name'), 'devicedata', '$Default')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/consumergroups', variables('eventhub_namespace_name'), 'normalizeddata', '$Default')]" + ], + "properties": { + "contentType": "text/plain", + "value": "[listkeys(resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'devicedata', 'reader'), '2017-04-01').primaryConnectionString]", + "attributes": { + "enabled": true + } + } + }, + { + "type": "Microsoft.KeyVault/vaults/secrets", + "apiVersion": "2016-10-01", + "name": "[concat(variables('key_vault_name'), '/device-output-connection')]", + "location": "[parameters('ResourceLocation')]", + "dependsOn": [ + "[resourceId('Microsoft.KeyVault/vaults', variables('key_vault_name'))]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'normalizeddata', 'writer')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/consumergroups', variables('eventhub_namespace_name'), 'devicedata', '$Default')]", + "[resourceId('Microsoft.EventHub/namespaces/eventhubs/consumergroups', variables('eventhub_namespace_name'), 'normalizeddata', '$Default')]" + ], + "properties": { + "contentType": "text/plain", + "value": "[listkeys(resourceId('Microsoft.EventHub/namespaces/eventhubs/authorizationRules', variables('eventhub_namespace_name'), 'normalizeddata', 'writer'), '2017-04-01').primaryConnectionString]", + "attributes": { + "enabled": true + } + } + } + ], + "outputs": { + } +} diff --git a/src/console/IomtLogger.cs b/src/console/IomtLogger.cs new file mode 100644 index 00000000..215f52b0 --- /dev/null +++ b/src/console/IomtLogger.cs @@ -0,0 +1,34 @@ +using EnsureThat; +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Extensibility; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Health.Logging.Telemetry; + +namespace Microsoft.Health.Fhir.Ingest.Console +{ + public class IomtLogger + { + public IomtLogger(IConfiguration configuration) + { + Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + public void ConfigureServices(IServiceCollection services) + { + EnsureArg.IsNotNull(services, nameof(services)); + + var instrumentationKey = Configuration.GetSection("APPINSIGHTS_INSTRUMENTATIONKEY").Value; + + services.TryAddSingleton(sp => + { + var config = new TelemetryConfiguration(instrumentationKey); + var telemetryClient = new TelemetryClient(config); + return new IomtTelemetryLogger(telemetryClient); + }); + } + } +} diff --git a/src/console/MeasurementCollectionToFhir/Processor.cs b/src/console/MeasurementCollectionToFhir/Processor.cs new file mode 100644 index 00000000..fee4369e --- /dev/null +++ b/src/console/MeasurementCollectionToFhir/Processor.cs @@ -0,0 +1,48 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using EnsureThat; +using Microsoft.Azure.WebJobs; +using Microsoft.Health.Events.EventConsumers; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Fhir.Ingest.Console.Template; +using Microsoft.Health.Fhir.Ingest.Host; +using Microsoft.Health.Fhir.Ingest.Service; +using Microsoft.Health.Logging.Telemetry; + +namespace Microsoft.Health.Fhir.Ingest.Console.MeasurementCollectionToFhir +{ + public class Processor : IEventConsumer + { + private ITemplateManager _templateManager; + private MeasurementFhirImportService _measurementImportService; + private string _templateDefinition; + private ITelemetryLogger _logger; + + public Processor( + [Blob("template/%Template:FhirMapping%", FileAccess.Read)] string templateDefinition, + ITemplateManager templateManager, + [MeasurementFhirImport] MeasurementFhirImportService measurementImportService, + ITelemetryLogger logger) + { + _templateDefinition = templateDefinition; + _templateManager = templateManager; + _measurementImportService = measurementImportService; + _logger = logger; + } + + public async Task ConsumeAsync(IEnumerable events) + { + EnsureArg.IsNotNull(events); + EnsureArg.IsNotNull(_templateDefinition); + + var templateContent = _templateManager.GetTemplateAsString(_templateDefinition); + await _measurementImportService.ProcessEventsAsync(events, templateContent, _logger).ConfigureAwait(false); + } + } +} diff --git a/src/console/MeasurementCollectionToFhir/ProcessorStartup.cs b/src/console/MeasurementCollectionToFhir/ProcessorStartup.cs new file mode 100644 index 00000000..00f69fe3 --- /dev/null +++ b/src/console/MeasurementCollectionToFhir/ProcessorStartup.cs @@ -0,0 +1,73 @@ +using EnsureThat; +using Hl7.Fhir.Model; +using Hl7.Fhir.Rest; +using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Microsoft.Health.Common; +using Microsoft.Health.Extensions.Fhir; +using Microsoft.Health.Extensions.Fhir.Config; +using Microsoft.Health.Fhir.Ingest.Config; +using Microsoft.Health.Fhir.Ingest.Host; +using Microsoft.Health.Fhir.Ingest.Service; +using Microsoft.Health.Fhir.Ingest.Template; +using System; +using System.Linq; + +namespace Microsoft.Health.Fhir.Ingest.Console.MeasurementCollectionToFhir +{ + public class ProcessorStartup + { + public ProcessorStartup(IConfiguration configuration) + { + Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + public void ConfigureServices(IServiceCollection services) + { + Configuration.GetSection("FhirService") + .GetChildren() + .ToList() + .ForEach(env => Environment.SetEnvironmentVariable(env.Path, env.Value)); + + services.Configure(Configuration.GetSection("ResourceIdentity")); + services.Configure(Configuration.GetSection("FhirClient")); + + services.TryAddSingleton, FhirClientFactory>(); + services.TryAddSingleton(sp => sp.GetRequiredService>().Create()); + services.TryAddSingleton, Observation>, R4FhirLookupTemplateProcessor>(); + services.TryAddSingleton(ResolveResourceIdentityService); + services.TryAddSingleton(sp => new MemoryCache(Options.Create(new MemoryCacheOptions { SizeLimit = 5000 }))); + services.TryAddSingleton(); + + services.TryAddSingleton(); + services.TryAddSingleton(); + services.TryAddSingleton(ResolveMeasurementImportProvider); + } + + private MeasurementFhirImportProvider ResolveMeasurementImportProvider(IServiceProvider serviceProvider) + { + EnsureArg.IsNotNull(serviceProvider, nameof(serviceProvider)); + + IOptions options = Options.Create(new MeasurementFhirImportOptions()); + var logger = new LoggerFactory(); + var measurementImportService = new MeasurementFhirImportProvider(Configuration, options, logger, serviceProvider); + + return measurementImportService; + } + + private static IResourceIdentityService ResolveResourceIdentityService(IServiceProvider serviceProvider) + { + EnsureArg.IsNotNull(serviceProvider, nameof(serviceProvider)); + + var fhirClient = serviceProvider.GetRequiredService(); + var resourceIdentityOptions = serviceProvider.GetRequiredService>(); + return ResourceIdentityServiceFactory.Instance.Create(resourceIdentityOptions.Value, fhirClient); + } + } +} diff --git a/src/console/Microsoft.Health.Fhir.Ingest.Console.csproj b/src/console/Microsoft.Health.Fhir.Ingest.Console.csproj new file mode 100644 index 00000000..f39568af --- /dev/null +++ b/src/console/Microsoft.Health.Fhir.Ingest.Console.csproj @@ -0,0 +1,27 @@ + + + + Exe + netcoreapp3.1 + + + + + + + + + + + + + + + + + + Always + + + + diff --git a/src/console/Normalize/Processor.cs b/src/console/Normalize/Processor.cs new file mode 100644 index 00000000..998d5d3e --- /dev/null +++ b/src/console/Normalize/Processor.cs @@ -0,0 +1,94 @@ +using EnsureThat; +using Microsoft.Azure.EventHubs; +using Microsoft.Azure.WebJobs; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; +using Microsoft.Health.Events.EventConsumers; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Fhir.Ingest.Config; +using Microsoft.Health.Fhir.Ingest.Console.Template; +using Microsoft.Health.Fhir.Ingest.Data; +using Microsoft.Health.Fhir.Ingest.Service; +using Microsoft.Health.Fhir.Ingest.Telemetry; +using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using static Microsoft.Azure.EventHubs.EventData; + +namespace Microsoft.Health.Fhir.Ingest.Console.Normalize +{ + public class Processor : IEventConsumer + { + private string _templateDefinition; + private ITemplateManager _templateManager; + private ITelemetryLogger _logger; + private IConfiguration _env; + private IOptions _options; + + public Processor( + [Blob("template/%Template:DeviceContent%", FileAccess.Read)] string templateDefinition, + ITemplateManager templateManager, + IConfiguration configuration, + IOptions collectorOptions, + ITelemetryLogger logger) + { + _templateDefinition = templateDefinition; + _templateManager = templateManager; + _logger = logger; + _env = configuration; + _options = collectorOptions; + } + + public async Task ConsumeAsync(IEnumerable events) + { + EnsureArg.IsNotNull(_templateDefinition); + var templateContent = _templateManager.GetTemplateAsString(_templateDefinition); + + var templateContext = CollectionContentTemplateFactory.Default.Create(templateContent); + templateContext.EnsureValid(); + var template = templateContext.Template; + + _logger.LogMetric( + IomtMetrics.DeviceEvent(), + events.Count()); + + IEnumerable eventHubEvents = events + .Select(x => + { + var eventData = new EventData(x.Body.ToArray()); + eventData.SystemProperties = new SystemPropertiesCollection( + x.SequenceNumber, + x.EnqueuedTime.UtcDateTime, + x.Offset.ToString(), + x.PartitionId); + + foreach (KeyValuePair entry in x.SystemProperties) + { + eventData.SystemProperties.TryAdd(entry.Key, entry.Value); + } + + return eventData; + }); + + var dataNormalizationService = new MeasurementEventNormalizationService(_logger, template); + + // todo: support managed identity + var connectionString = _env.GetSection("OutputEventHub").Value; + var sb = new EventHubsConnectionStringBuilder(connectionString); + var eventHubName = sb.EntityPath; + + var collector = CreateCollector(eventHubName, connectionString, _options); + + await dataNormalizationService.ProcessAsync(eventHubEvents, collector).ConfigureAwait(false); + } + + private IAsyncCollector CreateCollector(string eventHubName, string connectionString, IOptions options) + { + var client = options.Value.GetEventHubClient(eventHubName, connectionString); + return new MeasurementToEventAsyncCollector(new EventHubService(client)); + } + } +} diff --git a/src/console/Normalize/ProcessorStartup.cs b/src/console/Normalize/ProcessorStartup.cs new file mode 100644 index 00000000..1ff56b4d --- /dev/null +++ b/src/console/Normalize/ProcessorStartup.cs @@ -0,0 +1,36 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using EnsureThat; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Health.Fhir.Ingest.Config; + +namespace Microsoft.Health.Fhir.Ingest.Console.Normalize +{ + public class ProcessorStartup + { + public ProcessorStartup(IConfiguration configuration) + { + Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + public void ConfigureServices(IServiceCollection services) + { + var outputEventHubConnection = Configuration.GetSection("OutputEventHub").Value; + var outputEventHubName = outputEventHubConnection.Substring(outputEventHubConnection.LastIndexOf('=') + 1); + + EnsureArg.IsNotNullOrEmpty(outputEventHubConnection); + EnsureArg.IsNotNullOrEmpty(outputEventHubName); + + services.Configure(options => + { + options.AddSender(outputEventHubName, outputEventHubConnection); + }); + } + } +} diff --git a/src/console/Program.cs b/src/console/Program.cs new file mode 100644 index 00000000..1ddd6228 --- /dev/null +++ b/src/console/Program.cs @@ -0,0 +1,169 @@ +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Storage.Blobs; +using EnsureThat; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Microsoft.Health.Events.EventCheckpointing; +using Microsoft.Health.Events.EventConsumers; +using Microsoft.Health.Events.EventConsumers.Service; +using Microsoft.Health.Events.EventHubProcessor; +using Microsoft.Health.Events.Repository; +using Microsoft.Health.Fhir.Ingest.Config; +using Microsoft.Health.Fhir.Ingest.Console.Storage; +using Microsoft.Health.Fhir.Ingest.Console.Template; +using Microsoft.Health.Fhir.Ingest.Service; +using Microsoft.Health.Logging.Telemetry; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Health.Fhir.Ingest.Console +{ + public class Program + { + public static async Task Main() + { + var config = GetEnvironmentConfig(); + + // determine which event hub to read from + var eventHub = Environment.GetEnvironmentVariable("WEBJOBS_NAME"); + if (eventHub == null) + { + eventHub = config.GetSection("Console:EventHub").Value; + } + + System.Console.WriteLine($"Reading from event hub: {eventHub}"); + System.Console.WriteLine($"Logs and Metrics will be written to Application Insights"); + var eventHubOptions = GetEventHubInfo(config, eventHub); + + EnsureArg.IsNotNullOrWhiteSpace(eventHubOptions.EventHubConnectionString); + EnsureArg.IsNotNullOrWhiteSpace(eventHubOptions.EventHubName); + + var eventBatchingOptions = new EventBatchingOptions(); + config.GetSection(EventBatchingOptions.Settings).Bind(eventBatchingOptions); + + var serviceProvider = GetRequiredServiceProvider(config, eventHub); + var logger = serviceProvider.GetRequiredService(); + var eventConsumers = GetEventConsumers(config, eventHub, serviceProvider, logger); + + var storageOptions = new StorageCheckpointOptions(); + config.GetSection(StorageCheckpointOptions.Settings).Bind(storageOptions); + storageOptions.BlobPrefix = eventHub; + var checkpointClient = new StorageCheckpointClient(storageOptions, logger); + + var eventConsumerService = new EventConsumerService(eventConsumers, logger); + + var ct = new CancellationToken(); + + string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; + BlobContainerClient storageClient = new BlobContainerClient(storageOptions.BlobStorageConnectionString, storageOptions.BlobContainerName); + + var eventProcessorClientOptions = new EventProcessorClientOptions(); + eventProcessorClientOptions.MaximumWaitTime = TimeSpan.FromSeconds(60); + EventProcessorClient client = new EventProcessorClient(storageClient, consumerGroup, eventHubOptions.EventHubConnectionString, eventHubOptions.EventHubName, eventProcessorClientOptions); + + var eventBatchingService = new EventBatchingService(eventConsumerService, eventBatchingOptions, checkpointClient, logger); + var eventHubReader = new EventProcessor(eventBatchingService, checkpointClient, logger); + await eventHubReader.RunAsync(client, ct); + } + + public static IConfiguration GetEnvironmentConfig() + { + IConfiguration config = new ConfigurationBuilder() + .AddJsonFile("appsettings.json", true, true) + .AddEnvironmentVariables() + .Build(); + + return config; + } + + public static ServiceProvider GetRequiredServiceProvider(IConfiguration config, string eventHub) + { + if (eventHub == "devicedata") + { + var serviceCollection = new ServiceCollection(); + Normalize.ProcessorStartup startup = new Normalize.ProcessorStartup(config); + startup.ConfigureServices(serviceCollection); + + var loggingService = new IomtLogger(config); + loggingService.ConfigureServices(serviceCollection); + + var serviceProvider = serviceCollection.BuildServiceProvider(); + return serviceProvider; + } + else if (eventHub == "normalizeddata") + { + var serviceCollection = new ServiceCollection(); + MeasurementCollectionToFhir.ProcessorStartup startup = new MeasurementCollectionToFhir.ProcessorStartup(config); + startup.ConfigureServices(serviceCollection); + + var loggingService = new IomtLogger(config); + loggingService.ConfigureServices(serviceCollection); + + var serviceProvider = serviceCollection.BuildServiceProvider(); + return serviceProvider; + } + else + { + throw new Exception("No valid event hub type was found"); + } + } + + public static EventHubOptions GetEventHubInfo(IConfiguration config, string eventHub) + { + var connectionString = eventHub == "devicedata" + ? config.GetSection("InputEventHub").Value + : config.GetSection("OutputEventHub").Value; + + var eventHubName = connectionString.Substring(connectionString.LastIndexOf('=') + 1); + return new EventHubOptions(connectionString, eventHubName); + } + + public static List GetEventConsumers(IConfiguration config, string inputEventHub, ServiceProvider sp, ITelemetryLogger logger) + { + var eventConsumers = new List(); + var templateOptions = new TemplateOptions(); + config.GetSection(TemplateOptions.Settings).Bind(templateOptions); + + EnsureArg.IsNotNull(templateOptions); + EnsureArg.IsNotNull(templateOptions.BlobContainerName); + EnsureArg.IsNotNull(templateOptions.BlobStorageConnectionString); + + var storageManager = new StorageManager( + templateOptions.BlobStorageConnectionString, + templateOptions.BlobContainerName); + + var templateManager = new TemplateManager(storageManager); + + if (inputEventHub == "devicedata") + { + var template = config.GetSection("Template:DeviceContent").Value; + var deviceDataNormalization = new Normalize.Processor(template, templateManager, config, sp.GetRequiredService>(), logger); + eventConsumers.Add(deviceDataNormalization); + } + + else if (inputEventHub == "normalizeddata") + { + var template = config.GetSection("Template:FhirMapping").Value; + var measurementImportService = ResolveMeasurementService(sp); + var measurementToFhirConsumer = new MeasurementCollectionToFhir.Processor(template, templateManager, measurementImportService, logger); + eventConsumers.Add(measurementToFhirConsumer); + } + + if (config.GetSection("Console:Debug")?.Value == "true") + { + eventConsumers.Add(new EventPrinter()); + } + + return eventConsumers; + } + + public static MeasurementFhirImportService ResolveMeasurementService(IServiceProvider services) + { + return services.GetRequiredService(); + } + } +} diff --git a/src/console/Template/ITemplateManager.cs b/src/console/Template/ITemplateManager.cs new file mode 100644 index 00000000..c32dfe1e --- /dev/null +++ b/src/console/Template/ITemplateManager.cs @@ -0,0 +1,14 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +namespace Microsoft.Health.Fhir.Ingest.Console.Template +{ + public interface ITemplateManager + { + byte[] GetTemplate(string templateName); + + string GetTemplateAsString(string templateName); + } +} diff --git a/src/console/Template/TemplateManager.cs b/src/console/Template/TemplateManager.cs new file mode 100644 index 00000000..d1d55278 --- /dev/null +++ b/src/console/Template/TemplateManager.cs @@ -0,0 +1,31 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using Microsoft.Health.Events.Repository; +using System.Text; + +namespace Microsoft.Health.Fhir.Ingest.Console.Template +{ + public class TemplateManager : ITemplateManager + { + private IRepositoryManager _respositoryManager; + public TemplateManager(IRepositoryManager repositoryManager) + { + _respositoryManager = repositoryManager; + } + + public byte[] GetTemplate(string templateName) + { + return _respositoryManager.GetItem(templateName); + } + + public string GetTemplateAsString(string templateName) + { + var templateBuffer = GetTemplate(templateName); + string templateContent = Encoding.UTF8.GetString(templateBuffer, 0, templateBuffer.Length); + return templateContent; + } + } +} diff --git a/src/console/Template/TemplateOptions.cs b/src/console/Template/TemplateOptions.cs new file mode 100644 index 00000000..a6ef11d4 --- /dev/null +++ b/src/console/Template/TemplateOptions.cs @@ -0,0 +1,17 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + + +namespace Microsoft.Health.Fhir.Ingest.Console.Storage +{ + public class TemplateOptions + { + public const string Settings = "TemplateStorage"; + + public string BlobStorageConnectionString { get; set; } + + public string BlobContainerName { get; set; } + } +} diff --git a/src/console/appsettings.json b/src/console/appsettings.json new file mode 100644 index 00000000..2e22ba90 --- /dev/null +++ b/src/console/appsettings.json @@ -0,0 +1,24 @@ +{ + "APPINSIGHTS_INSTRUMENTATIONKEY": "", + "EventBatching:FlushTimespan": 300, + "EventBatching:MaxEvents": 300, + "Storage:BlobStorageConnectionString": "", + "Storage:BlobContainerName": "", + "Storage:BlobPrefix": "", + "Storage:CheckpointBatchCount": 5, + "TemplateStorage:BlobStorageConnectionString": "", + "TemplateStorage:BlobContainerName": "", + "Console:EventHub": "devicedata", + "FhirClient:UseManagedIdentity": "true", + "FhirService:Authority": "", + "FhirService:ClientId": "", + "FhirService:ClientSecret": "", + "FhirService:Resource": "", + "FhirService:Url": "", + "InputEventHub": "", + "OutputEventHub": "", + "ResourceIdentity:ResourceIdentityServiceType": "Create", + "ResourceIdentity:DefaultDeviceIdentifierSystem": "", + "Template:DeviceContent": "devicecontent.json", + "Template:FhirMapping": "fhirmapping.json" +} \ No newline at end of file diff --git a/src/func/Microsoft.Health.Fhir.Ingest.Host/IomtConnectorFunctions.cs b/src/func/Microsoft.Health.Fhir.Ingest.Host/IomtConnectorFunctions.cs index 4548fc74..85b5ee63 100644 --- a/src/func/Microsoft.Health.Fhir.Ingest.Host/IomtConnectorFunctions.cs +++ b/src/func/Microsoft.Health.Fhir.Ingest.Host/IomtConnectorFunctions.cs @@ -17,6 +17,7 @@ using Microsoft.Health.Fhir.Ingest.Host; using Microsoft.Health.Fhir.Ingest.Telemetry; using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; namespace Microsoft.Health.Fhir.Ingest.Service { diff --git a/src/func/Microsoft.Health.Fhir.Ingest.Host/Startup.cs b/src/func/Microsoft.Health.Fhir.Ingest.Host/Startup.cs index 59ea480e..574470fb 100644 --- a/src/func/Microsoft.Health.Fhir.Ingest.Host/Startup.cs +++ b/src/func/Microsoft.Health.Fhir.Ingest.Host/Startup.cs @@ -10,7 +10,7 @@ using Microsoft.ApplicationInsights.Extensibility; using Microsoft.Azure.Functions.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Health.Fhir.Ingest.Telemetry; +using Microsoft.Health.Logging.Telemetry; namespace Microsoft.Health.Fhir.Ingest.Service { diff --git a/src/lib/Microsoft.Health.Common/Telemetry/Metrics/Dimensions/DimensionNames.cs b/src/lib/Microsoft.Health.Common/Telemetry/Metrics/Dimensions/DimensionNames.cs index d76f52a3..cdfae29b 100644 --- a/src/lib/Microsoft.Health.Common/Telemetry/Metrics/Dimensions/DimensionNames.cs +++ b/src/lib/Microsoft.Health.Common/Telemetry/Metrics/Dimensions/DimensionNames.cs @@ -22,6 +22,16 @@ public static class DimensionNames /// public static string Operation => nameof(DimensionNames.Operation); + /// + /// A metric dimension that represents a timestamp property of a metric. + /// + public static string Timestamp => nameof(DimensionNames.Timestamp); + + /// + /// A metric dimension that represents an identifier related to the metric emitted. + /// + public static string Identifier => nameof(DimensionNames.Identifier); + /// /// A metric dimension for a error type. /// diff --git a/src/lib/Microsoft.Health.Events/EventCheckpointing/ICheckpointClient.cs b/src/lib/Microsoft.Health.Events/EventCheckpointing/ICheckpointClient.cs new file mode 100644 index 00000000..0023844f --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventCheckpointing/ICheckpointClient.cs @@ -0,0 +1,19 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Threading.Tasks; +using Microsoft.Health.Events.Model; + +namespace Microsoft.Health.Events.EventCheckpointing +{ + public interface ICheckpointClient + { + Task SetCheckpointAsync(IEventMessage eventArg); + + Task PublishCheckpointAsync(string partitionId); + + Task GetCheckpointForPartitionAsync(string partitionId); + } +} diff --git a/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs b/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs new file mode 100644 index 00000000..dfee3ff5 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs @@ -0,0 +1,164 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using EnsureThat; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Events.Telemetry; +using Microsoft.Health.Logging.Telemetry; + +namespace Microsoft.Health.Events.EventCheckpointing +{ + public class StorageCheckpointClient : ICheckpointClient + { + private ConcurrentDictionary _checkpoints; + private ConcurrentDictionary _lastCheckpointTracker; + private int _lastCheckpointMaxCount; + private BlobContainerClient _storageClient; + private ITelemetryLogger _log; + + public StorageCheckpointClient(StorageCheckpointOptions options, ITelemetryLogger log) + { + EnsureArg.IsNotNull(options); + EnsureArg.IsNotNullOrWhiteSpace(options.BlobPrefix); + EnsureArg.IsNotNullOrWhiteSpace(options.BlobStorageConnectionString); + EnsureArg.IsNotNullOrWhiteSpace(options.BlobContainerName); + EnsureArg.IsNotNullOrWhiteSpace(options.CheckpointBatchCount); + + BlobPrefix = options.BlobPrefix; + + _lastCheckpointMaxCount = int.Parse(options.CheckpointBatchCount); + _checkpoints = new ConcurrentDictionary(); + _lastCheckpointTracker = new ConcurrentDictionary(); + _storageClient = new BlobContainerClient(options.BlobStorageConnectionString, options.BlobContainerName); + _log = log; + } + + public string BlobPrefix { get; } + + public async Task UpdateCheckpointAsync(Checkpoint checkpoint) + { + EnsureArg.IsNotNull(checkpoint); + EnsureArg.IsNotNullOrWhiteSpace(checkpoint.Id); + var lastProcessed = EnsureArg.IsNotNullOrWhiteSpace(checkpoint.LastProcessed.DateTime.ToString("MM/dd/yyyy hh:mm:ss.fff tt")); + + var blobName = $"{BlobPrefix}/checkpoint/{checkpoint.Id}"; + var blobClient = _storageClient.GetBlobClient(blobName); + + var metadata = new Dictionary() + { + { "LastProcessed", lastProcessed }, + }; + + try + { + try + { + await blobClient.SetMetadataAsync(metadata); + } + catch (RequestFailedException ex) when ((ex.ErrorCode == BlobErrorCode.BlobNotFound) || (ex.ErrorCode == BlobErrorCode.ContainerNotFound)) + { + using (var blobContent = new MemoryStream(Array.Empty())) + { + await blobClient.UploadAsync(blobContent, metadata: metadata).ConfigureAwait(false); + } + } + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + _log.LogError(new Exception($"Unable to update checkpoint. {ex.Message}")); + } + } + + public Task GetCheckpointForPartitionAsync(string partitionIdentifier) + { + var prefix = $"{BlobPrefix}/checkpoint/{partitionIdentifier}"; + + Task GetCheckpointAsync() + { + var checkpoint = new Checkpoint(); + + foreach (BlobItem blob in _storageClient.GetBlobs(traits: BlobTraits.Metadata, states: BlobStates.All, prefix: prefix, cancellationToken: CancellationToken.None)) + { + var partitionId = blob.Name.Split('/').Last(); + DateTimeOffset lastEventTimestamp = DateTime.MinValue; + + if (blob.Metadata.TryGetValue("LastProcessed", out var str)) + { + DateTimeOffset.TryParse(str, null, DateTimeStyles.AssumeUniversal, out lastEventTimestamp); + } + + checkpoint.Prefix = BlobPrefix; + checkpoint.Id = partitionId; + checkpoint.LastProcessed = lastEventTimestamp; + } + + return Task.FromResult(checkpoint); + } + + try + { + // todo: consider retries + return GetCheckpointAsync(); + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + _log.LogError(new Exception($"Unable to get checkpoint for partition. {ex.Message}")); + throw; + } + } + + public async Task SetCheckpointAsync(IEventMessage eventArgs) + { + EnsureArg.IsNotNull(eventArgs); + EnsureArg.IsNotNullOrWhiteSpace(eventArgs.PartitionId); + + try + { + var partitionId = eventArgs.PartitionId; + var checkpoint = new Checkpoint(); + checkpoint.LastProcessed = eventArgs.EnqueuedTime; + checkpoint.Id = partitionId; + checkpoint.Prefix = BlobPrefix; + + _checkpoints[partitionId] = checkpoint; + var count = _lastCheckpointTracker.AddOrUpdate(partitionId, 1, (key, value) => value + 1); + + if (count >= _lastCheckpointMaxCount) + { + await PublishCheckpointAsync(partitionId); + _log.LogMetric(EventMetrics.EventWatermark(partitionId, eventArgs.EnqueuedTime.UtcDateTime), 1); + _lastCheckpointTracker[partitionId] = 0; + } + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + _log.LogError(new Exception($"Unable to set checkpoint. {ex.Message}")); + } + } + + public async Task PublishCheckpointAsync(string partitionId) + { + Checkpoint checkpoint = _checkpoints[partitionId]; + await UpdateCheckpointAsync(checkpoint); + } + } +} diff --git a/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointOptions.cs b/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointOptions.cs new file mode 100644 index 00000000..a6b0ea5b --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointOptions.cs @@ -0,0 +1,20 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +namespace Microsoft.Health.Events.EventCheckpointing +{ + public class StorageCheckpointOptions + { + public const string Settings = "Storage"; + + public string BlobStorageConnectionString { get; set; } + + public string BlobContainerName { get; set; } + + public string BlobPrefix { get; set; } + + public string CheckpointBatchCount { get; set; } + } +} diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/EventPrinter.cs b/src/lib/Microsoft.Health.Events/EventConsumers/EventPrinter.cs new file mode 100644 index 00000000..4b09e07b --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventConsumers/EventPrinter.cs @@ -0,0 +1,30 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using EnsureThat; +using Microsoft.Health.Events.Model; + +namespace Microsoft.Health.Events.EventConsumers +{ + public class EventPrinter : IEventConsumer + { + public Task ConsumeAsync(IEnumerable events) + { + EnsureArg.IsNotNull(events); + foreach (EventMessage evt in events) + { + string message = Encoding.UTF8.GetString(evt.Body.ToArray()); + var enqueuedTime = evt.EnqueuedTime.UtcDateTime; + Console.WriteLine($"Enqueued Time: {enqueuedTime} Event Message: \"{message}\""); + } + + return Task.CompletedTask; + } + } +} diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/IEventConsumer.cs b/src/lib/Microsoft.Health.Events/EventConsumers/IEventConsumer.cs new file mode 100644 index 00000000..a512bbfb --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventConsumers/IEventConsumer.cs @@ -0,0 +1,16 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Health.Events.Model; + +namespace Microsoft.Health.Events.EventConsumers +{ + public interface IEventConsumer + { + Task ConsumeAsync(IEnumerable events); + } +} diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingOptions.cs b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingOptions.cs new file mode 100644 index 00000000..20f8f40c --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingOptions.cs @@ -0,0 +1,16 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +namespace Microsoft.Health.Events.EventConsumers.Service +{ + public class EventBatchingOptions + { + public const string Settings = "EventBatching"; + + public int FlushTimespan { get; set; } + + public int MaxEvents { get; set; } + } +} diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs new file mode 100644 index 00000000..9354a40e --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs @@ -0,0 +1,145 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using EnsureThat; +using Microsoft.Health.Events.EventCheckpointing; +using Microsoft.Health.Events.EventConsumers.Service.Infrastructure; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Logging.Telemetry; + +namespace Microsoft.Health.Events.EventConsumers.Service +{ + public class EventBatchingService : IEventConsumerService + { + private ConcurrentDictionary _eventPartitions; + private int _maxEvents; + private TimeSpan _flushTimespan; + private IEventConsumerService _eventConsumerService; + private ICheckpointClient _checkpointClient; + private ITelemetryLogger _logger; + private const int _timeBuffer = -5; + + public EventBatchingService(IEventConsumerService eventConsumerService, EventBatchingOptions options, ICheckpointClient checkpointClient, ITelemetryLogger logger) + { + EnsureArg.IsNotNull(options); + EnsureArg.IsInt(options.MaxEvents); + EnsureArg.IsInt(options.FlushTimespan); + + _eventPartitions = new ConcurrentDictionary(); + _eventConsumerService = eventConsumerService; + _maxEvents = options.MaxEvents; + _flushTimespan = TimeSpan.FromSeconds(options.FlushTimespan); + _checkpointClient = checkpointClient; + _logger = logger; + } + + public EventPartition GetPartition(string partitionId) + { + EnsureArg.IsNotNullOrWhiteSpace(partitionId); + + if (!_eventPartitions.ContainsKey(partitionId)) + { + throw new Exception($"Partition with identifier {partitionId} does not exist"); + } + + return _eventPartitions[partitionId]; + } + + private bool EventPartitionExists(string partitionId) + { + return _eventPartitions.ContainsKey(partitionId); + } + + private EventPartition CreatePartitionIfMissing(string partitionId, DateTime initTime, TimeSpan flushTimespan) + { + return _eventPartitions.GetOrAdd(partitionId, new EventPartition(partitionId, initTime, flushTimespan, _logger)); + } + + public Task ConsumeEvent(IEventMessage eventArg) + { + EnsureArg.IsNotNull(eventArg); + + var partitionId = eventArg.PartitionId; + var eventEnqueuedTime = eventArg.EnqueuedTime.UtcDateTime; + + if (eventArg is MaximumWaitEvent) + { + if (EventPartitionExists(partitionId)) + { + var windowThresholdTime = GetPartition(partitionId).GetPartitionWindow(); + ThresholdWaitReached(partitionId, windowThresholdTime); + } + } + else + { + var partition = CreatePartitionIfMissing(partitionId, eventEnqueuedTime, _flushTimespan); + + partition.Enqueue(eventArg); + + var windowThresholdTime = partition.GetPartitionWindow(); + if (eventEnqueuedTime > windowThresholdTime) + { + ThresholdTimeReached(partitionId, eventArg, windowThresholdTime); + return Task.CompletedTask; + } + + if (partition.GetPartitionBatchCount() >= _maxEvents) + { + ThresholdCountReached(partitionId); + } + } + + return Task.CompletedTask; + } + + // todo: fix -"Collection was modified; enumeration operation may not execute." + private async void ThresholdCountReached(string partitionId) + { + _logger.LogTrace($"Partition {partitionId} threshold count {_maxEvents} was reached."); + var events = await GetPartition(partitionId).Flush(_maxEvents); + await _eventConsumerService.ConsumeEvents(events); + await UpdateCheckpoint(events); + } + + private async void ThresholdTimeReached(string partitionId, IEventMessage eventArg, DateTime windowEnd) + { + _logger.LogTrace($"Partition {partitionId} threshold time {_eventPartitions[partitionId].GetPartitionWindow()} was reached."); + var queue = GetPartition(partitionId); + var events = await queue.Flush(windowEnd); + queue.IncrementPartitionWindow(eventArg.EnqueuedTime.UtcDateTime); + await _eventConsumerService.ConsumeEvents(events); + await UpdateCheckpoint(events); + } + + private async void ThresholdWaitReached(string partitionId, DateTime windowEnd) + { + if (windowEnd < DateTime.UtcNow.AddSeconds(_timeBuffer)) + { + _logger.LogTrace($"Partition {partitionId} threshold wait reached. Flushing {_eventPartitions[partitionId].GetPartitionBatchCount()} events up to: {windowEnd}"); + var events = await GetPartition(partitionId).Flush(windowEnd); + await _eventConsumerService.ConsumeEvents(events); + await UpdateCheckpoint(events); + } + } + + private async Task UpdateCheckpoint(List events) + { + if (events.Count > 0) + { + var eventCheckpoint = events[events.Count - 1]; + await _checkpointClient.SetCheckpointAsync(eventCheckpoint); + } + } + + public Task ConsumeEvents(IEnumerable events) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventConsumerService.cs b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventConsumerService.cs new file mode 100644 index 00000000..f483bf13 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventConsumers/Service/EventConsumerService.cs @@ -0,0 +1,78 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Logging.Telemetry; + +namespace Microsoft.Health.Events.EventConsumers.Service +{ + public class EventConsumerService : IEventConsumerService + { + private readonly IEnumerable _eventConsumers; + private const int _maximumBackoffMs = 32000; + private ITelemetryLogger _logger; + + public EventConsumerService(IEnumerable eventConsumers, ITelemetryLogger logger) + { + _eventConsumers = eventConsumers; + _logger = logger; + } + + public Task ConsumeEvent(IEventMessage eventArg) + { + throw new NotImplementedException(); + } + + public async Task ConsumeEvents(IEnumerable events) + { + if (events.Any()) + { + foreach (IEventConsumer eventConsumer in _eventConsumers) + { + await OperationWithRetryAsync(eventConsumer, events); + } + } + } + + private async Task OperationWithRetryAsync(IEventConsumer eventConsumer, IEnumerable events) + { + int currentRetry = 0; + double backoffMs = 0; + Random random = new Random(); + bool operationComplete = false; + + while (!operationComplete) + { + try + { + if (currentRetry > 0 && backoffMs < _maximumBackoffMs) + { + int randomMs = random.Next(0, 1000); + backoffMs = Math.Pow(2000, currentRetry) + randomMs; + await Task.Delay((int)backoffMs); + } + + await TryOperationAsync(eventConsumer, events).ConfigureAwait(false); + break; + } +#pragma warning disable CA1031 + catch (Exception e) +#pragma warning restore CA1031 + { + _logger.LogError(e); + } + } + } + + private static async Task TryOperationAsync(IEventConsumer eventConsumer, IEnumerable events) + { + await eventConsumer.ConsumeAsync(events); + } + } +} diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/Service/IEventConsumerService.cs b/src/lib/Microsoft.Health.Events/EventConsumers/Service/IEventConsumerService.cs new file mode 100644 index 00000000..d003006a --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventConsumers/Service/IEventConsumerService.cs @@ -0,0 +1,18 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Health.Events.Model; + +namespace Microsoft.Health.Events.EventConsumers.Service +{ + public interface IEventConsumerService + { + Task ConsumeEvents(IEnumerable events); + + Task ConsumeEvent(IEventMessage eventArg); + } +} diff --git a/src/lib/Microsoft.Health.Events/EventConsumers/Service/Infrastructure/EventPartition.cs b/src/lib/Microsoft.Health.Events/EventConsumers/Service/Infrastructure/EventPartition.cs new file mode 100644 index 00000000..00ca635f --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventConsumers/Service/Infrastructure/EventPartition.cs @@ -0,0 +1,100 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Events.Telemetry; +using Microsoft.Health.Logging.Telemetry; + +namespace Microsoft.Health.Events.EventConsumers.Service.Infrastructure +{ + public class EventPartition + { + private string _partitionId; + private ConcurrentQueue _partition; + private DateTime _partitionWindow; + private TimeSpan _flushTimespan; + private ITelemetryLogger _logger; + + public EventPartition(string partitionId, DateTime initDateTime, TimeSpan flushTimespan, ITelemetryLogger logger) + { + _partitionId = partitionId; + _partition = new ConcurrentQueue(); + _partitionWindow = initDateTime.Add(flushTimespan); + _flushTimespan = flushTimespan; + _logger = logger; + } + + public void Enqueue(IEventMessage eventArg) + { + _partition.Enqueue(eventArg); + } + + public void IncrementPartitionWindow(DateTime dateTime) + { + // todo: consider computing instead of while loop. + while (dateTime >= _partitionWindow) + { + _partitionWindow = _partitionWindow.Add(_flushTimespan); + } + } + + public DateTime GetPartitionWindow() + { + return _partitionWindow; + } + + public int GetPartitionBatchCount() + { + return _partition.Count; + } + + // flush a fixed number of events + public Task> Flush(int numEvents) + { + var count = 0; + var events = new List(); + + while (count < numEvents) + { + if (_partition.TryDequeue(out var dequeuedEvent)) + { + events.Add(dequeuedEvent); + count++; + } + } + + _logger.LogTrace($"Flushed {events.Count} events on partition {_partitionId}"); + _logger.LogMetric(EventMetrics.EventsFlushed(), events.Count); + return Task.FromResult(events); + } + + // flush up to a date time + public Task> Flush(DateTime dateTime) + { + var events = new List(); + while (_partition.TryPeek(out var eventData)) + { + var enqueuedUtc = eventData.EnqueuedTime.UtcDateTime; + if (enqueuedUtc <= dateTime) + { + _partition.TryDequeue(out var dequeuedEvent); + events.Add(dequeuedEvent); + } + else + { + break; + } + } + + _logger.LogTrace($"Flushed {events.Count} events up to {dateTime} on partition {_partitionId}"); + _logger.LogMetric(EventMetrics.EventsFlushed(), events.Count); + return Task.FromResult(events); + } + } +} \ No newline at end of file diff --git a/src/lib/Microsoft.Health.Events/EventHubProcessor/EventHubOptions.cs b/src/lib/Microsoft.Health.Events/EventHubProcessor/EventHubOptions.cs new file mode 100644 index 00000000..74e787a2 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventHubProcessor/EventHubOptions.cs @@ -0,0 +1,22 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +namespace Microsoft.Health.Events.EventHubProcessor +{ + public class EventHubOptions + { + public const string Settings = "EventHub"; + + public EventHubOptions(string connectionString, string name) + { + EventHubConnectionString = connectionString; + EventHubName = name; + } + + public string EventHubConnectionString { get; set; } + + public string EventHubName { get; set; } + } +} diff --git a/src/lib/Microsoft.Health.Events/EventHubProcessor/EventProcessor.cs b/src/lib/Microsoft.Health.Events/EventHubProcessor/EventProcessor.cs new file mode 100644 index 00000000..a256d1b0 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/EventHubProcessor/EventProcessor.cs @@ -0,0 +1,110 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Processor; +using EnsureThat; +using Microsoft.Health.Events.EventCheckpointing; +using Microsoft.Health.Events.EventConsumers.Service; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Events.Telemetry; +using Microsoft.Health.Logging.Telemetry; + +namespace Microsoft.Health.Events.EventHubProcessor +{ + public class EventProcessor + { + private IEventConsumerService _eventConsumerService; + private ICheckpointClient _checkpointClient; + private ITelemetryLogger _logger; + + public EventProcessor(IEventConsumerService eventConsumerService, ICheckpointClient checkpointClient, ITelemetryLogger logger) + { + _eventConsumerService = eventConsumerService; + _checkpointClient = checkpointClient; + _logger = logger; + } + + public async Task RunAsync(EventProcessorClient processor, CancellationToken ct) + { + EnsureArg.IsNotNull(processor); + + // Processes two types of events + // 1) Event hub events + // 2) Maximum wait events. These are generated when we have not received an event hub + // event for a certain time period and this event is used to flush events in the current window. + Task ProcessEventHandler(ProcessEventArgs eventArgs) + { + IEventMessage evt; + if (eventArgs.HasEvent) + { + evt = EventMessageFactory.CreateEvent(eventArgs); + } + else + { + evt = new MaximumWaitEvent(eventArgs.Partition.PartitionId, DateTime.UtcNow); + } + + _eventConsumerService.ConsumeEvent(evt); + + return Task.CompletedTask; + } + + // todo: consider retry + Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) + { + _logger.LogError(eventArgs.Exception); + return Task.CompletedTask; + } + + async Task ProcessInitializingHandler(PartitionInitializingEventArgs initArgs) + { + var partitionId = initArgs.PartitionId; + _logger.LogTrace($"Initializing partition {partitionId}"); + + try + { + var checkpoint = await _checkpointClient.GetCheckpointForPartitionAsync(partitionId); + initArgs.DefaultStartingPosition = EventPosition.FromEnqueuedTime(checkpoint.LastProcessed); + _logger.LogTrace($"Starting to read partition {partitionId} from checkpoint {checkpoint.LastProcessed}"); + _logger.LogMetric(EventMetrics.EventHubPartitionInitialized(), 1); + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + _logger.LogTrace($"Failed to initialize partition {partitionId} from checkpoint"); + _logger.LogError(ex); + } + } + + processor.ProcessEventAsync += ProcessEventHandler; + processor.ProcessErrorAsync += ProcessErrorHandler; + processor.PartitionInitializingAsync += ProcessInitializingHandler; + + try + { + Console.WriteLine($"Starting event hub processor at {DateTime.UtcNow}"); + await processor.StartProcessingAsync(); + + while (!ct.IsCancellationRequested) + { + } + + await processor.StopProcessingAsync(); + } + finally + { + processor.ProcessEventAsync -= ProcessEventHandler; + processor.ProcessErrorAsync -= ProcessErrorHandler; + processor.PartitionInitializingAsync -= ProcessInitializingHandler; + } + } + } +} diff --git a/src/lib/Microsoft.Health.Events/Microsoft.Health.Events.csproj b/src/lib/Microsoft.Health.Events/Microsoft.Health.Events.csproj new file mode 100644 index 00000000..22d52d98 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Microsoft.Health.Events.csproj @@ -0,0 +1,41 @@ + + + netcoreapp3.1 + ..\..\..\CustomAnalysisRules.ruleset + true + 7.3 + + + true + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + diff --git a/src/lib/Microsoft.Health.Events/Microsoft.Health.Events.sln b/src/lib/Microsoft.Health.Events/Microsoft.Health.Events.sln new file mode 100644 index 00000000..dc46272d --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Microsoft.Health.Events.sln @@ -0,0 +1,27 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.30523.141 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Events", "Microsoft.Health.Events.csproj", "{62ABBEA7-F031-4BCC-A4D4-2CC535BBFEE7}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{2D9C34D3-4743-45D9-B208-D87283BF783A}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {62ABBEA7-F031-4BCC-A4D4-2CC535BBFEE7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {62ABBEA7-F031-4BCC-A4D4-2CC535BBFEE7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {62ABBEA7-F031-4BCC-A4D4-2CC535BBFEE7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {62ABBEA7-F031-4BCC-A4D4-2CC535BBFEE7}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {B4435849-B7E6-4A6E-95AD-672F6F3E188A} + EndGlobalSection +EndGlobal diff --git a/src/lib/Microsoft.Health.Events/Model/Checkpoint.cs b/src/lib/Microsoft.Health.Events/Model/Checkpoint.cs new file mode 100644 index 00000000..d0e15dcd --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Model/Checkpoint.cs @@ -0,0 +1,18 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; + +namespace Microsoft.Health.Events.Model +{ + public class Checkpoint + { + public string Prefix { get; set; } + + public string Id { get; set; } + + public DateTimeOffset LastProcessed { get; set; } + } +} diff --git a/src/lib/Microsoft.Health.Events/Model/EventMessage.cs b/src/lib/Microsoft.Health.Events/Model/EventMessage.cs new file mode 100644 index 00000000..c539d72b --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Model/EventMessage.cs @@ -0,0 +1,51 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; + +namespace Microsoft.Health.Events.Model +{ + public class EventMessage : IEventMessage + { + public EventMessage(string partitionId, DateTime dateTime) + { + EnqueuedTime = dateTime; + PartitionId = partitionId; + } + + public EventMessage( + string partitionId, + ReadOnlyMemory body, + long sequenceNumber, + long offset, + DateTimeOffset enqueuedTime, + IDictionary properties, + IReadOnlyDictionary systemProperties) + { + PartitionId = partitionId; + Body = body; + SequenceNumber = sequenceNumber; + Offset = offset; + EnqueuedTime = enqueuedTime; + Properties = new Dictionary(properties); + SystemProperties = new Dictionary(systemProperties); + } + + public string PartitionId { get; } + + public ReadOnlyMemory Body { get; } + + public long SequenceNumber { get; } + + public long Offset { get; } + + public DateTimeOffset EnqueuedTime { get; } + + public IDictionary Properties { get; } + + public IReadOnlyDictionary SystemProperties { get; } + } +} diff --git a/src/lib/Microsoft.Health.Events/Model/EventMessageFactory.cs b/src/lib/Microsoft.Health.Events/Model/EventMessageFactory.cs new file mode 100644 index 00000000..79fd8dfc --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Model/EventMessageFactory.cs @@ -0,0 +1,26 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using Azure.Messaging.EventHubs.Processor; + +namespace Microsoft.Health.Events.Model +{ + public static class EventMessageFactory + { + public static IEventMessage CreateEvent(ProcessEventArgs eventArgs) + { + var eventMessage = new EventMessage( + eventArgs.Partition.PartitionId, + eventArgs.Data.Body, + eventArgs.Data.Offset, + eventArgs.Data.SequenceNumber, + eventArgs.Data.EnqueuedTime.UtcDateTime, + eventArgs.Data.Properties, + eventArgs.Data.SystemProperties); + + return eventMessage; + } + } +} diff --git a/src/lib/Microsoft.Health.Events/Model/IEventMessage.cs b/src/lib/Microsoft.Health.Events/Model/IEventMessage.cs new file mode 100644 index 00000000..18c1dde2 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Model/IEventMessage.cs @@ -0,0 +1,26 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- +using System; +using System.Collections.Generic; + +namespace Microsoft.Health.Events.Model +{ + public interface IEventMessage + { + string PartitionId { get; } + + ReadOnlyMemory Body { get; } + + long SequenceNumber { get; } + + long Offset { get; } + + DateTimeOffset EnqueuedTime { get; } + + IDictionary Properties { get; } + + IReadOnlyDictionary SystemProperties { get; } + } +} diff --git a/src/lib/Microsoft.Health.Events/Model/MaximumWaitEvent.cs b/src/lib/Microsoft.Health.Events/Model/MaximumWaitEvent.cs new file mode 100644 index 00000000..a759da11 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Model/MaximumWaitEvent.cs @@ -0,0 +1,17 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; + +namespace Microsoft.Health.Events.Model +{ + public class MaximumWaitEvent : EventMessage + { + public MaximumWaitEvent(string partitionId, DateTime dateTime) + : base(partitionId, dateTime) + { + } + } +} diff --git a/src/lib/Microsoft.Health.Events/Repository/IRepositoryManager.cs b/src/lib/Microsoft.Health.Events/Repository/IRepositoryManager.cs new file mode 100644 index 00000000..84e218fe --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Repository/IRepositoryManager.cs @@ -0,0 +1,12 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +namespace Microsoft.Health.Events.Repository +{ + public interface IRepositoryManager + { + byte[] GetItem(string itemName); + } +} diff --git a/src/lib/Microsoft.Health.Events/Repository/StorageManager.cs b/src/lib/Microsoft.Health.Events/Repository/StorageManager.cs new file mode 100644 index 00000000..6547b0e4 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Repository/StorageManager.cs @@ -0,0 +1,38 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.IO; +using Azure.Storage.Blobs; +using EnsureThat; + +namespace Microsoft.Health.Events.Repository +{ + public class StorageManager : IRepositoryManager + { + private BlobContainerClient _blobContainer; + + public StorageManager(string connectionString, string blobContainerName) + { + EnsureArg.IsNotNull(connectionString); + EnsureArg.IsNotNull(blobContainerName); + + _blobContainer = new BlobContainerClient(connectionString, blobContainerName); + } + + public byte[] GetItem(string itemName) + { + EnsureArg.IsNotNull(itemName); + + var blockBlob = _blobContainer.GetBlobClient(itemName); + + using (var memoryStream = new MemoryStream()) + { + blockBlob.DownloadTo(memoryStream); + byte[] itemContent = memoryStream.ToArray(); + return itemContent; + } + } + } +} diff --git a/src/lib/Microsoft.Health.Events/Telemetry/Metrics/EventMetrics.cs b/src/lib/Microsoft.Health.Events/Telemetry/Metrics/EventMetrics.cs new file mode 100644 index 00000000..725b08e8 --- /dev/null +++ b/src/lib/Microsoft.Health.Events/Telemetry/Metrics/EventMetrics.cs @@ -0,0 +1,104 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using Microsoft.Health.Common.Telemetry; + +namespace Microsoft.Health.Events.Telemetry +{ + /// + /// Defines known metrics and metric dimensions for use in Application Insights + /// + public static class EventMetrics + { + private static string _nameDimension = DimensionNames.Name; + private static string _categoryDimension = DimensionNames.Category; + private static string _timeDimension = DimensionNames.Timestamp; + private static string _partitionDimension = DimensionNames.Identifier; + + private static Metric _eventHubPartitionInitialized = new Metric( + "EventHubPartitionInitialized", + new Dictionary + { + { _nameDimension, "EventHubPartitionInitialized" }, + { _categoryDimension, Category.Traffic }, + }); + + private static Metric _eventBatchCreated = new Metric( + "EventBatchCreated", + new Dictionary + { + { _nameDimension, "EventBatchCreated" }, + { _categoryDimension, Category.Traffic }, + }); + + private static Metric _eventsFlushed = new Metric( + "EventsFlushed", + new Dictionary + { + { _nameDimension, "EventsFlushed" }, + { _categoryDimension, Category.Traffic }, + }); + + private static Metric _eventsConsumed = new Metric( + "EventsConsumed", + new Dictionary + { + { _nameDimension, "EventsConsumed" }, + { _categoryDimension, Category.Traffic }, + }); + + /// + /// Signals that an event hub partition has been intialized. + /// + public static Metric EventHubPartitionInitialized() + { + return _eventHubPartitionInitialized; + } + + /// + /// Signals that a batch of event hub events was created. + /// + public static Metric EventBatchCreated() + { + return _eventBatchCreated; + } + + /// + /// Signals that a batch of event hub events was flushed. + /// + public static Metric EventsFlushed() + { + return _eventsFlushed; + } + + /// + /// Signals that a batch of event hub events was consumed downstream. + /// + public static Metric EventsConsumed() + { + return _eventsConsumed; + } + + /// + /// Signals that a new watermark was published for a partition. + /// + /// The partition id of the event hub + /// The datetime of the watermark + public static Metric EventWatermark(string partitionId, DateTime dateTime) + { + return new Metric( + "EventsWatermarkUpdated", + new Dictionary + { + { _nameDimension, "EventsWatermarkUpdated" }, + { _timeDimension, dateTime.ToString() }, + { _partitionDimension, partitionId }, + { _categoryDimension, Category.Latency }, + }); + } + } +} diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj b/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj index be734a7e..a50366a6 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj @@ -46,6 +46,7 @@ + diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs index 0471f206..0114ac84 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementEventNormalizationService.cs @@ -15,6 +15,7 @@ using Microsoft.Health.Fhir.Ingest.Data; using Microsoft.Health.Fhir.Ingest.Telemetry; using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; using Newtonsoft.Json.Linq; namespace Microsoft.Health.Fhir.Ingest.Service diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementFhirImportService.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementFhirImportService.cs index 6a38f369..818c2e7c 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementFhirImportService.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementFhirImportService.cs @@ -11,10 +11,12 @@ using EnsureThat; using Microsoft.Health.Common.Service; using Microsoft.Health.Common.Telemetry; +using Microsoft.Health.Events.Model; using Microsoft.Health.Fhir.Ingest.Config; using Microsoft.Health.Fhir.Ingest.Data; using Microsoft.Health.Fhir.Ingest.Telemetry; using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -31,15 +33,34 @@ public MeasurementFhirImportService(FhirImportService fhirImportService, Measure } public async Task ProcessStreamAsync(Stream data, string templateDefinition, ITelemetryLogger log) + { + var template = BuildTemplate(templateDefinition, log); + var measurementGroups = await ParseAsync(data, log).ConfigureAwait(false); + + await ProcessMeasurementGroups(measurementGroups, template, log).ConfigureAwait(false); + } + + public async Task ProcessEventsAsync(IEnumerable events, string templateDefinition, ITelemetryLogger log) + { + var template = BuildTemplate(templateDefinition, log); + var measurementGroups = ParseEventData(events, log); + + await ProcessMeasurementGroups(measurementGroups, template, log).ConfigureAwait(false); + } + + private ILookupTemplate BuildTemplate(string templateDefinition, ITelemetryLogger log) { EnsureArg.IsNotNull(templateDefinition, nameof(templateDefinition)); EnsureArg.IsNotNull(log, nameof(log)); + var templateContext = Options.TemplateFactory.Create(templateDefinition); templateContext.EnsureValid(); - var template = templateContext.Template; - var measurementGroups = await ParseAsync(data, log).ConfigureAwait(false); + return templateContext.Template; + } + private async Task ProcessMeasurementGroups(IEnumerable measurementGroups, ILookupTemplate template, ITelemetryLogger log) + { // Group work by device to avoid race conditions when resource creation is enabled. var workItems = measurementGroups.GroupBy(grp => grp.DeviceId) .Select(grp => new Func( @@ -87,6 +108,28 @@ private static async Task> ParseAsync(Stream data return measurementGroups; } + private static IEnumerable ParseEventData(IEnumerable data, ITelemetryLogger log) + { + // Deserialize events into measurements and then group according to the device, type, and other factors + return data.Select(e => JsonConvert.DeserializeObject(System.Text.Encoding.Default.GetString(e.Body.ToArray()))) + .GroupBy(m => $"{m.DeviceId}-{m.Type}-{m.PatientId}-{m.EncounterId}-{m.CorrelationId}") + .Select(g => + { + var measurements = g.ToList(); + _ = CalculateMetricsAsync(measurements, log).ConfigureAwait(false); + return new MeasurementGroup + { + Data = measurements, + MeasureType = measurements[0].Type, + CorrelationId = measurements[0].CorrelationId, + DeviceId = measurements[0].DeviceId, + EncounterId = measurements[0].EncounterId, + PatientId = measurements[0].PatientId, + }; + }) + .ToArray(); + } + private static async Task CalculateMetricsAsync(IList measurements, ITelemetryLogger log) { await Task.Run(() => @@ -120,4 +163,4 @@ await Task.Run(() => }).ConfigureAwait(false); } } -} +} \ No newline at end of file diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/ExceptionTelemetryProcessor.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/ExceptionTelemetryProcessor.cs index c0850d37..630598cf 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/ExceptionTelemetryProcessor.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/ExceptionTelemetryProcessor.cs @@ -11,6 +11,7 @@ using Microsoft.Health.Fhir.Ingest.Data; using Microsoft.Health.Fhir.Ingest.Service; using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; namespace Microsoft.Health.Fhir.Ingest.Telemetry { diff --git a/src/lib/Microsoft.Health.Logger/Microsoft.Health.Logging.csproj b/src/lib/Microsoft.Health.Logger/Microsoft.Health.Logging.csproj new file mode 100644 index 00000000..cc6efe0a --- /dev/null +++ b/src/lib/Microsoft.Health.Logger/Microsoft.Health.Logging.csproj @@ -0,0 +1,16 @@ + + + + netcoreapp3.1 + + + + + + + + + + + + diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/ITelemetryLogger.cs b/src/lib/Microsoft.Health.Logger/Telemetry/ITelemetryLogger.cs similarity index 92% rename from src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/ITelemetryLogger.cs rename to src/lib/Microsoft.Health.Logger/Telemetry/ITelemetryLogger.cs index 1ee00d37..fc2f597c 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/ITelemetryLogger.cs +++ b/src/lib/Microsoft.Health.Logger/Telemetry/ITelemetryLogger.cs @@ -3,10 +3,10 @@ // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- -using System; using Microsoft.Health.Common.Telemetry; +using System; -namespace Microsoft.Health.Fhir.Ingest.Telemetry +namespace Microsoft.Health.Logging.Telemetry { public interface ITelemetryLogger { diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/IomtTelemetryLogger.cs b/src/lib/Microsoft.Health.Logger/Telemetry/IomtTelemetryLogger.cs similarity index 93% rename from src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/IomtTelemetryLogger.cs rename to src/lib/Microsoft.Health.Logger/Telemetry/IomtTelemetryLogger.cs index c071e51a..5647c75a 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/IomtTelemetryLogger.cs +++ b/src/lib/Microsoft.Health.Logger/Telemetry/IomtTelemetryLogger.cs @@ -6,9 +6,9 @@ using System; using EnsureThat; using Microsoft.ApplicationInsights; -using Microsoft.Health.Fhir.Ingest.Telemetry.Metrics; +using Microsoft.Health.Logging.Metrics.Telemetry; -namespace Microsoft.Health.Fhir.Ingest.Telemetry +namespace Microsoft.Health.Logging.Telemetry { public class IomtTelemetryLogger : ITelemetryLogger { diff --git a/src/lib/Microsoft.Health.Logger/Telemetry/Metrics/MetricExtensionMethods.cs b/src/lib/Microsoft.Health.Logger/Telemetry/Metrics/MetricExtensionMethods.cs new file mode 100644 index 00000000..6d82df48 --- /dev/null +++ b/src/lib/Microsoft.Health.Logger/Telemetry/Metrics/MetricExtensionMethods.cs @@ -0,0 +1,258 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using EnsureThat; +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Metrics; + +namespace Microsoft.Health.Logging.Metrics.Telemetry +{ + public static class MetricExtensionMethods + { + private static string _namespace = MetricIdentifier.DefaultMetricNamespace; + + public static void LogMetric(this Common.Telemetry.Metric metric, TelemetryClient telemetryClient, double metricValue) + { + EnsureArg.IsNotNull(metric); + EnsureArg.IsNotNull(telemetryClient); + + var metricName = metric.Name; + var dimensions = metric.Dimensions; + var dimensionNumber = metric.Dimensions.Count; + + if (dimensionNumber > 10) + { + telemetryClient.TrackException( + new Exception($"Metric {metricName} exceeds the amount of allowed dimensions")); + return; + } + + string[] dimNames = new string[dimensions.Count]; + dimensions.Keys.CopyTo(dimNames, 0); + + string[] dimValues = new string[dimNames.Length]; + int count = 0; + foreach (string dimName in dimNames) + { + dimValues[count] = dimensions[dimName].ToString(); + count++; + } + + dimensions.Values.CopyTo(dimValues, 0); + + switch (dimensionNumber) + { + case 0: + telemetryClient + .GetMetric( + metricName) + .TrackValue( + metricValue); + break; + case 1: + telemetryClient + .GetMetric( + metricName, + dimNames[0]) + .TrackValue( + metricValue, + dimValues[0]); + break; + case 2: + telemetryClient + .GetMetric( + metricName, + dimNames[0], + dimNames[1]) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1]); + break; + case 3: + telemetryClient + .GetMetric( + metricName, + dimNames[0], + dimNames[1], + dimNames[2]) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2]); + break; + case 4: + telemetryClient + .GetMetric( + metricName, + dimNames[0], + dimNames[1], + dimNames[2], + dimNames[3]) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2], + dimValues[3]); + break; + case 5: + var metric5DId = new MetricIdentifier( + _namespace, + metricName, + dimNames[0], + dimNames[1], + dimNames[2], + dimNames[3], + dimNames[4]); + + telemetryClient + .GetMetric(metric5DId) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2], + dimValues[3], + dimValues[4]); + break; + case 6: + var metric6DId = new MetricIdentifier( + _namespace, + metricName, + dimNames[0], + dimNames[1], + dimNames[2], + dimNames[3], + dimNames[4], + dimNames[5]); + + telemetryClient + .GetMetric(metric6DId) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2], + dimValues[3], + dimValues[4], + dimValues[5]); + break; + case 7: + var metric7DId = new MetricIdentifier( + _namespace, + metricName, + dimNames[0], + dimNames[1], + dimNames[2], + dimNames[3], + dimNames[4], + dimNames[5], + dimNames[6]); + + telemetryClient + .GetMetric(metric7DId) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2], + dimValues[3], + dimValues[4], + dimValues[5], + dimValues[6]); + break; + case 8: + var metric8DId = new MetricIdentifier( + _namespace, + metricName, + dimNames[0], + dimNames[1], + dimNames[2], + dimNames[3], + dimNames[4], + dimNames[5], + dimNames[6], + dimNames[7]); + + telemetryClient + .GetMetric(metric8DId) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2], + dimValues[3], + dimValues[4], + dimValues[5], + dimValues[6], + dimValues[7]); + break; + case 9: + var metric9DId = new MetricIdentifier( + _namespace, + metricName, + dimNames[0], + dimNames[1], + dimNames[2], + dimNames[3], + dimNames[4], + dimNames[5], + dimNames[6], + dimNames[7], + dimNames[8]); + + telemetryClient + .GetMetric(metric9DId) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2], + dimValues[3], + dimValues[4], + dimValues[5], + dimValues[6], + dimValues[7], + dimValues[8]); + break; + case 10: + var metric10DId = new MetricIdentifier( + _namespace, + metricName, + dimNames[0], + dimNames[1], + dimNames[2], + dimNames[3], + dimNames[4], + dimNames[5], + dimNames[6], + dimNames[7], + dimNames[8], + dimNames[9]); + + telemetryClient + .GetMetric(metric10DId) + .TrackValue( + metricValue, + dimValues[0], + dimValues[1], + dimValues[2], + dimValues[3], + dimValues[4], + dimValues[5], + dimValues[6], + dimValues[7], + dimValues[8], + dimValues[9]); + break; + default: + break; + } + } + } +} diff --git a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs index 7988457a..8ad0ef70 100644 --- a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs +++ b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementEventNormalizationServiceTests.cs @@ -9,8 +9,8 @@ using Microsoft.Azure.EventHubs; using Microsoft.Azure.WebJobs; using Microsoft.Health.Fhir.Ingest.Data; -using Microsoft.Health.Fhir.Ingest.Telemetry; using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; using Newtonsoft.Json.Linq; using NSubstitute; using Xunit; diff --git a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementFhirImportServiceTests.cs b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementFhirImportServiceTests.cs index 740e0578..ffac86fd 100644 --- a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementFhirImportServiceTests.cs +++ b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Service/MeasurementFhirImportServiceTests.cs @@ -14,6 +14,7 @@ using Microsoft.Health.Fhir.Ingest.Data; using Microsoft.Health.Fhir.Ingest.Telemetry; using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; using Microsoft.Health.Tests.Common; using Newtonsoft.Json; using NSubstitute; diff --git a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Telemetry/ExceptionTelemetryProcessorTests.cs b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Telemetry/ExceptionTelemetryProcessorTests.cs index fc98b96d..f8f32ca5 100644 --- a/test/Microsoft.Health.Fhir.Ingest.UnitTests/Telemetry/ExceptionTelemetryProcessorTests.cs +++ b/test/Microsoft.Health.Fhir.Ingest.UnitTests/Telemetry/ExceptionTelemetryProcessorTests.cs @@ -8,6 +8,7 @@ using Microsoft.Health.Extensions.Fhir; using Microsoft.Health.Fhir.Ingest.Service; using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; using NSubstitute; using Xunit;