Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Console app prototype for stream analytics replacement #78

Merged
merged 17 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions Microsoft.Health.Fhir.Ingest.sln
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,17 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "healthkitOnFhir", "healthki
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Template", "src\lib\Microsoft.Health.Fhir.Ingest.Template\Microsoft.Health.Fhir.Ingest.Template.csproj", "{85D653A7-0E63-4751-8904-728156807A14}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Ingest.Schema", "src\lib\Microsoft.Health.Fhir.Ingest.Schema\Microsoft.Health.Fhir.Ingest.Schema.csproj", "{A85AB6BC-698C-460F-81D4-9D1D2BD14B71}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Schema", "src\lib\Microsoft.Health.Fhir.Ingest.Schema\Microsoft.Health.Fhir.Ingest.Schema.csproj", "{A85AB6BC-698C-460F-81D4-9D1D2BD14B71}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Ingest.Template.UnitTests", "test\Microsoft.Health.Fhir.Ingest.Template.UnitTests\Microsoft.Health.Fhir.Ingest.Template.UnitTests.csproj", "{EE072537-807D-4FE2-BFEB-424B64DCD7F9}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Template.UnitTests", "test\Microsoft.Health.Fhir.Ingest.Template.UnitTests\Microsoft.Health.Fhir.Ingest.Template.UnitTests.csproj", "{EE072537-807D-4FE2-BFEB-424B64DCD7F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Events", "src\lib\Microsoft.Health.Events\Microsoft.Health.Events.csproj", "{22275DE3-859D-40F0-9547-7711568164C0}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "console", "console", "{1EF3584A-C437-4B45-8BF8-1597D5A8DBC7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Ingest.Console", "src\console\Microsoft.Health.Fhir.Ingest.Console.csproj", "{927BC214-ABD9-4A1B-9F7C-75973513D141}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Logger", "src\lib\Microsoft.Health.Logger\Microsoft.Health.Logger.csproj", "{05123BAE-E96E-4C7E-95CB-C616DF940F17}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -147,6 +155,18 @@ Global
{EE072537-807D-4FE2-BFEB-424B64DCD7F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EE072537-807D-4FE2-BFEB-424B64DCD7F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EE072537-807D-4FE2-BFEB-424B64DCD7F9}.Release|Any CPU.Build.0 = Release|Any CPU
{22275DE3-859D-40F0-9547-7711568164C0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{22275DE3-859D-40F0-9547-7711568164C0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{22275DE3-859D-40F0-9547-7711568164C0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{22275DE3-859D-40F0-9547-7711568164C0}.Release|Any CPU.Build.0 = Release|Any CPU
{927BC214-ABD9-4A1B-9F7C-75973513D141}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{927BC214-ABD9-4A1B-9F7C-75973513D141}.Debug|Any CPU.Build.0 = Debug|Any CPU
{927BC214-ABD9-4A1B-9F7C-75973513D141}.Release|Any CPU.ActiveCfg = Release|Any CPU
{927BC214-ABD9-4A1B-9F7C-75973513D141}.Release|Any CPU.Build.0 = Release|Any CPU
{05123BAE-E96E-4C7E-95CB-C616DF940F17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{05123BAE-E96E-4C7E-95CB-C616DF940F17}.Debug|Any CPU.Build.0 = Debug|Any CPU
{05123BAE-E96E-4C7E-95CB-C616DF940F17}.Release|Any CPU.ActiveCfg = Release|Any CPU
{05123BAE-E96E-4C7E-95CB-C616DF940F17}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -172,6 +192,9 @@ Global
{85D653A7-0E63-4751-8904-728156807A14} = {513D67B4-80E1-476D-955F-E7E7C79D144A}
{A85AB6BC-698C-460F-81D4-9D1D2BD14B71} = {513D67B4-80E1-476D-955F-E7E7C79D144A}
{EE072537-807D-4FE2-BFEB-424B64DCD7F9} = {FAF8B402-892E-4EA2-B4CF-69B0C70BA762}
{22275DE3-859D-40F0-9547-7711568164C0} = {513D67B4-80E1-476D-955F-E7E7C79D144A}
{927BC214-ABD9-4A1B-9F7C-75973513D141} = {1EF3584A-C437-4B45-8BF8-1597D5A8DBC7}
{05123BAE-E96E-4C7E-95CB-C616DF940F17} = {513D67B4-80E1-476D-955F-E7E7C79D144A}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A358924D-F948-4AE8-8CD0-A0F56225CE0C}
Expand Down
34 changes: 34 additions & 0 deletions src/console/IomtLogging.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using EnsureThat;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Health.Logger.Telemetry;

namespace Microsoft.Health.Fhir.Ingest.Console
{
public class IomtLogging
wi-y marked this conversation as resolved.
Show resolved Hide resolved
{
public IomtLogging(IConfiguration configuration)
{
Configuration = configuration;
}

public IConfiguration Configuration { get; }

public void ConfigureServices(IServiceCollection services)
{
EnsureArg.IsNotNull(services, nameof(services));

var instrumentationKey = Configuration.GetSection("APPINSIGHTS_INSTRUMENTATIONKEY").Value;

services.TryAddSingleton<ITelemetryLogger>(sp =>
{
var config = new TelemetryConfiguration(instrumentationKey);
var telemetryClient = new TelemetryClient(config);
return new IomtTelemetryLogger(telemetryClient);
});
}
}
}
48 changes: 48 additions & 0 deletions src/console/MeasurementCollectionToFhir/Processor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// -------------------------------------------------------------------------------------------------
wi-y marked this conversation as resolved.
Show resolved Hide resolved
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.WebJobs;
using Microsoft.Health.Events.EventConsumers;
using Microsoft.Health.Events.Model;
using Microsoft.Health.Fhir.Ingest.Console.Template;
using Microsoft.Health.Fhir.Ingest.Host;
using Microsoft.Health.Fhir.Ingest.Service;
using Microsoft.Health.Logger.Telemetry;

namespace Microsoft.Health.Fhir.Ingest.Console.MeasurementCollectionToFhir
{
public class Processor : IEventConsumer
{
private ITemplateManager _templateManager;
private MeasurementFhirImportService _measurementImportService;
private string _templateDefinition;
private ITelemetryLogger _logger;

public Processor(
[Blob("template/%Template:FhirMapping%", FileAccess.Read)] string templateDefinition,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was required to replace ":" with "_" in order to deploy this in a Linux container.

ITemplateManager templateManager,
[MeasurementFhirImport] MeasurementFhirImportService measurementImportService,
ITelemetryLogger logger)
{
_templateDefinition = templateDefinition;
_templateManager = templateManager;
_measurementImportService = measurementImportService;
_logger = logger;
}

public async Task ConsumeAsync(IEnumerable<IEventMessage> events)
{
EnsureArg.IsNotNull(events);
EnsureArg.IsNotNull(_templateDefinition);

var templateContent = _templateManager.GetTemplateAsString(_templateDefinition);
await _measurementImportService.ProcessEventsAsync(events, templateContent, _logger).ConfigureAwait(false);
}
}
}
73 changes: 73 additions & 0 deletions src/console/MeasurementCollectionToFhir/ProcessorStartup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using EnsureThat;
using Hl7.Fhir.Model;
using Hl7.Fhir.Rest;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Health.Common;
using Microsoft.Health.Extensions.Fhir;
using Microsoft.Health.Extensions.Fhir.Config;
using Microsoft.Health.Fhir.Ingest.Config;
using Microsoft.Health.Fhir.Ingest.Host;
using Microsoft.Health.Fhir.Ingest.Service;
using Microsoft.Health.Fhir.Ingest.Template;
using System;
using System.Linq;

namespace Microsoft.Health.Fhir.Ingest.Console.MeasurementCollectionToFhir
{
public class ProcessorStartup
{
public ProcessorStartup(IConfiguration configuration)
{
Configuration = configuration;
}

public IConfiguration Configuration { get; }

public void ConfigureServices(IServiceCollection services)
{
Configuration.GetSection("FhirService")
.GetChildren()
.ToList()
.ForEach(env => Environment.SetEnvironmentVariable(env.Path, env.Value));

services.Configure<ResourceIdentityOptions>(Configuration.GetSection("ResourceIdentity"));
services.Configure<FhirClientFactoryOptions>(Configuration.GetSection("FhirClient"));

services.TryAddSingleton<IFactory<IFhirClient>, FhirClientFactory>();
services.TryAddSingleton(sp => sp.GetRequiredService<IFactory<IFhirClient>>().Create());
services.TryAddSingleton<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Observation>, R4FhirLookupTemplateProcessor>();
services.TryAddSingleton(ResolveResourceIdentityService);
services.TryAddSingleton<IMemoryCache>(sp => new MemoryCache(Options.Create(new MemoryCacheOptions { SizeLimit = 5000 })));
services.TryAddSingleton<FhirImportService, R4FhirImportService>();

services.TryAddSingleton<MeasurementFhirImportOptions>();
services.TryAddSingleton<MeasurementFhirImportService>();
services.TryAddSingleton(ResolveMeasurementImportProvider);
}

private MeasurementFhirImportProvider ResolveMeasurementImportProvider(IServiceProvider serviceProvider)
{
EnsureArg.IsNotNull(serviceProvider, nameof(serviceProvider));

IOptions<MeasurementFhirImportOptions> options = Options.Create(new MeasurementFhirImportOptions());
var logger = new LoggerFactory();
var measurementImportService = new MeasurementFhirImportProvider(Configuration, options, logger, serviceProvider);

return measurementImportService;
}

private static IResourceIdentityService ResolveResourceIdentityService(IServiceProvider serviceProvider)
{
EnsureArg.IsNotNull(serviceProvider, nameof(serviceProvider));

var fhirClient = serviceProvider.GetRequiredService<IFhirClient>();
var resourceIdentityOptions = serviceProvider.GetRequiredService<IOptions<ResourceIdentityOptions>>();
return ResourceIdentityServiceFactory.Instance.Create(resourceIdentityOptions.Value, fhirClient);
}
}
}
27 changes: 27 additions & 0 deletions src/console/Microsoft.Health.Fhir.Ingest.Console.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Mvc.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Core" Version="2.2.5" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="4.0.3" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\lib\Microsoft.Health.Events\Microsoft.Health.Events.csproj" />
<ProjectReference Include="..\lib\Microsoft.Health.Fhir.Ingest\Microsoft.Health.Fhir.Ingest.csproj" />
<ProjectReference Include="..\lib\Microsoft.Health.Fhir.R4.Ingest\Microsoft.Health.Fhir.R4.Ingest.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>
94 changes: 94 additions & 0 deletions src/console/Normalize/Processor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using EnsureThat;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Microsoft.Health.Events.EventConsumers;
using Microsoft.Health.Events.Model;
using Microsoft.Health.Fhir.Ingest.Config;
using Microsoft.Health.Fhir.Ingest.Console.Template;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Service;
using Microsoft.Health.Fhir.Ingest.Telemetry;
using Microsoft.Health.Fhir.Ingest.Template;
using Microsoft.Health.Logger.Telemetry;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using static Microsoft.Azure.EventHubs.EventData;

namespace Microsoft.Health.Fhir.Ingest.Console.Normalize
{
public class Processor : IEventConsumer
{
private string _templateDefinition;
private ITemplateManager _templateManager;
private ITelemetryLogger _logger;
private IConfiguration _env;
private IOptions<EventHubMeasurementCollectorOptions> _options;

public Processor(
[Blob("template/%Template:DeviceContent%", FileAccess.Read)] string templateDefinition,
ITemplateManager templateManager,
IConfiguration configuration,
IOptions<EventHubMeasurementCollectorOptions> collectorOptions,
ITelemetryLogger logger)
{
_templateDefinition = templateDefinition;
_templateManager = templateManager;
_logger = logger;
_env = configuration;
_options = collectorOptions;
}

public async Task ConsumeAsync(IEnumerable<IEventMessage> events)
{
EnsureArg.IsNotNull(_templateDefinition);
var templateContent = _templateManager.GetTemplateAsString(_templateDefinition);

var templateContext = CollectionContentTemplateFactory.Default.Create(templateContent);
templateContext.EnsureValid();
var template = templateContext.Template;

_logger.LogMetric(
IomtMetrics.DeviceEvent(),
events.Count());

IEnumerable<EventData> eventHubEvents = events
.Select(x =>
{
var eventData = new EventData(x.Body.ToArray());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would modify the the below code to also work with your new Event class. For now I would suggest either having the Event class wrap the original EventData so we can just pass it around or create extension methods that support converting back and worth to simplify the code here.

eventData.SystemProperties = new SystemPropertiesCollection(
x.SequenceNumber,
x.EnqueuedTime.UtcDateTime,
x.Offset.ToString(),
x.PartitionId);

foreach (KeyValuePair<string, object> entry in x.SystemProperties)
{
eventData.SystemProperties.TryAdd(entry.Key, entry.Value);
}

return eventData;
});

var dataNormalizationService = new MeasurementEventNormalizationService(_logger, template);

// todo: support managed identity
var connectionString = _env.GetSection("OutputEventHub").Value;
wi-y marked this conversation as resolved.
Show resolved Hide resolved
var sb = new EventHubsConnectionStringBuilder(connectionString);
var eventHubName = sb.EntityPath;

var collector = CreateCollector(eventHubName, connectionString, _options);

await dataNormalizationService.ProcessAsync(eventHubEvents, collector).ConfigureAwait(false);
}

private IAsyncCollector<IMeasurement> CreateCollector(string eventHubName, string connectionString, IOptions<EventHubMeasurementCollectorOptions> options)
{
var client = options.Value.GetEventHubClient(eventHubName, connectionString);
return new MeasurementToEventAsyncCollector(new EventHubService(client));
}
}
}
36 changes: 36 additions & 0 deletions src/console/Normalize/ProcessorStartup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using EnsureThat;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Ingest.Config;

namespace Microsoft.Health.Fhir.Ingest.Console.Normalize
{
public class ProcessorStartup
{
public ProcessorStartup(IConfiguration configuration)
{
Configuration = configuration;
}

public IConfiguration Configuration { get; }

public void ConfigureServices(IServiceCollection services)
{
var outputEventHubConnection = Configuration.GetSection("OutputEventHub").Value;
var outputEventHubName = outputEventHubConnection.Substring(outputEventHubConnection.LastIndexOf('=') + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var outputEventHubName = outputEventHubConnection.Substring(outputEventHubConnection.LastIndexOf('=') + 1); [](start = 12, length = 107)

Should be able to use the event hub connection string builder here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we move to Managed Identity, we probably want to have setting for the different components the connection string represents (minus the SAS token):
ConsumerGroup
FullyQualifiedNamespace
EventHubName

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, once this is in I am planning on making stories for supporting MI. There are few things I would like to iterate on overall in this code but I would like to this initial build in as a base committed to master so we have the history.


In reply to: 544605846 [](ancestors = 544605846)


EnsureArg.IsNotNullOrEmpty(outputEventHubConnection);
EnsureArg.IsNotNullOrEmpty(outputEventHubName);

services.Configure<EventHubMeasurementCollectorOptions>(options =>
{
options.AddSender(outputEventHubName, outputEventHubConnection);
});
}
}
}
Loading