-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Console app prototype for stream analytics replacement #78
Conversation
src/lib/Microsoft.Health.Events/EventConsumers/Service/EventConsumerService.cs
Outdated
Show resolved
Hide resolved
src/lib/Microsoft.Health.Events/EventConsumers/IEventConsumer.cs
Outdated
Show resolved
Hide resolved
src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs
Outdated
Show resolved
Hide resolved
src/lib/Microsoft.Health.Events/EventConsumers/Service/EventConsumerService.cs
Outdated
Show resolved
Hide resolved
src/lib/Microsoft.Health.Events/EventConsumers/Service/EventBatchingService.cs
Outdated
Show resolved
Hide resolved
src/lib/Microsoft.Health.Events/EventConsumers/Service/Infrastructure/EventQueueWindow.cs
Outdated
Show resolved
Hide resolved
{ | ||
while (currentEnqueudTime >= _windowEnd) | ||
{ | ||
_windowEnd = _windowEnd.Add(_flushTimespan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to do this with math. The difference between the times will result in a timespan. You should be able to determine how many times bigger the time is from the window end. You can then scale the window end by X amount of the timespan.
src/lib/Microsoft.Health.Events/EventConsumers/Service/Infrastructure/EventQueue.cs
Outdated
Show resolved
Hide resolved
{ | ||
Console.WriteLine($"Threshold wait reached. Flushing {_eventQueues[queueId].GetQueueCount()} events up to: {windowEnd}"); | ||
var events = await GetQueue(queueId).Flush(windowEnd); | ||
await _eventConsumerService.ConsumeEvents(events); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to increment the window time here as well correct? If we don't any new event that arrives will immediately trigger a new batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to increment here. This method is only called when we haven't received an event in a while, and will only flush if the window end is sufficiently in the past.
If we get a new event then it has to be outside the current window end, we will flush the previous window which has 0 events and advance the window to contain this new event.
The main reason why I didn't want to increment here is to avoid a race condition with the other flushing mechanisms (ThresholdTimeReached and ThresholdCountReached). Perhaps this should be better designed though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a race condition they should be processed on the same thread? We had a race condition in my version since we had a timer triggering the processing.
public void ConfigureServices(IServiceCollection services) | ||
{ | ||
var outputEventHubConnection = Configuration.GetSection("OutputEventHub").Value; | ||
var outputEventHubName = outputEventHubConnection.Substring(outputEventHubConnection.LastIndexOf('=') + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var outputEventHubName = outputEventHubConnection.Substring(outputEventHubConnection.LastIndexOf('=') + 1); [](start = 12, length = 107)
Should be able to use the event hub connection string builder here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we move to Managed Identity, we probably want to have setting for the different components the connection string represents (minus the SAS token):
ConsumerGroup
FullyQualifiedNamespace
EventHubName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, once this is in I am planning on making stories for supporting MI. There are few things I would like to iterate on overall in this code but I would like to this initial build in as a base committed to master so we have the history.
In reply to: 544605846 [](ancestors = 544605846)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wi-y I re-reviewed. I think you addressed most of the must fix items. I would like to get at least one other person's review here though and get another point of view. It also looks like the build isn't triggering for this PR. Is that something you could look into?
Overall I think we may want to iterate on this some more but rather than do it here I would like to get into master and do follow up with some additional PRs. Make sense?
@dustinburson thank you for reviewing. I agree with getting this into master and iterating. I'll look into the builds not triggering. |
src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs
Show resolved
Hide resolved
src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs
Outdated
Show resolved
Hide resolved
src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs
Outdated
Show resolved
Hide resolved
return Task.CompletedTask; | ||
} | ||
|
||
async Task ProcessInitializingHandler(PartitionInitializingEventArgs initArgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplifying the codebase by using the built in checkpointing sounds like a resonable suggestion. If we find that the extra values being checkpointed (i..e enqueuedDateTime) don't provide value then we could explore a refactor.
private ITelemetryLogger _logger; | ||
|
||
public Processor( | ||
[Blob("template/%Template:FhirMapping%", FileAccess.Read)] string templateDefinition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was required to replace ":" with "_" in order to deploy this in a Linux container.
public void ConfigureServices(IServiceCollection services) | ||
{ | ||
var outputEventHubConnection = Configuration.GetSection("OutputEventHub").Value; | ||
var outputEventHubName = outputEventHubConnection.Substring(outputEventHubConnection.LastIndexOf('=') + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we move to Managed Identity, we probably want to have setting for the different components the connection string represents (minus the SAS token):
ConsumerGroup
FullyQualifiedNamespace
EventHubName
src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs
Outdated
Show resolved
Hide resolved
_publisherTimer.Enabled = true; | ||
} | ||
|
||
private async void OnTimedEvent(object source, ElapsedEventArgs e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Dustin. I think that the checkpoint should be the completed work "source of truth". Should this process crash for some reason, we could lose checkpoints that were saved in memory. Also, ListCheckpointsAsync, only lists checkpoints that are in blob, and will not include the checkpoints in memory. This seems like trouble.
DateTimeOffset enqueuedTime, | ||
IDictionary<string, object> properties, | ||
IReadOnlyDictionary<string, object> systemProperties) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it makes sense to do null checks here. It seems like some of these properties are "required", so enforcing this on the initializer might make sense. That or maybe in the factory.
src/lib/Microsoft.Health.Fhir.Ingest/Service/MeasurementFhirImportService.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* template and script for deploying sa replacement as webjobs * fix template bug * log measurment metrics * fix params * update arm template and deploy script * set alwaysOn * fix errors/telemetry
…github.com/microsoft/iomt-fhir into personal/wiyochum/event-hub-read-and-batch
This is the initial prototype for an oss stream analytics replacement console app.
Before running the app, you will also need fill out appsettings.json with the appropriate event hub and storage account information, and well as instruct the app to read from the appropriate event hub (determined by Console:EventHub).
To run the app, navigate to the console folder of the project and set Microsoft.Health.Fhir.Ingest.Console as the startup project, and then build/run. Some logging info will appear in the console for number of events processed for a given window.
You can adjust the window in appsettings.json via EventBatching:FlushTimespan and EventBatching:MaxEvents.
There are some todos remaining which include: