-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Personal/duburson/large msg perf improvements v2 #209
Personal/duburson/large msg perf improvements v2 #209
Conversation
…rocessess events immediately while emitting progress metrics and checkpointing when thresholds are hit.
* Update EventMessageJTokenCoverter to add properties directly to the JObject rather then new object declartions * Add missing metrics to NormalizationStreamingEventConsumerService
Add DeviceEvents to new Normalization
* Send projected events as a batch rather than one at a time.
…EventNormalizationService + Normalize.Processor * Move ITemplateManager and TemplateManager into the Ingest library so they can be referenced by the new class.
await _retryPolicy.ExecuteAsync(async () => await ConsumeAsyncImpl(events)); | ||
} | ||
|
||
private Task<IContentTemplate> GetNormalizationTemplate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One concern I have, is with a lower batch size (10), we are refreshing the template much more frequently. Going to look at ways to reduce the impact.
…n template if content on blob is modified.
await _retryPolicy.ExecuteAsync(async () => await ConsumeAsyncImpl(events)); | ||
} | ||
|
||
private async Task<IContentTemplate> GetNormalizationTemplate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not 100% happy with this. Main issue is the code is executed per partition on different threads, hence the need for a semaphore to control concurrency when there isn't any horizontal scaling. A better approach long term would be to move this to a separate class that is injected that will monitor the file and update the template when there are changes, otherwise
Note, a semaphoresilm is used over a readwritelockslim because the readwritelockslim doesn't work with async calls used in the function. State is stored in a threadlocal variable so when the awaited call returns, unless we get the same thread, the lock is lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had taken a stab at optimizing this in the past. My approach entailed wrapping the current TemplateManager
in a CachingTemplateManager
. And caching the results for X minutes. The reason I liked this approach is that it eliminates the majority of network requests to the storage account.
A cache miss could result in multiple threads (one per partition) attempting to fill the cache. But I felt that is ok as the multiple attempts would fill the cache with the same data. Locking could also be implemented inside of the cache population delegate if we wanted to eliminate that.
The big downside here is we go from instantly picking up customer changes to forcing a new read at regular intervals. So that change would need to be articulated to customers. But even setting the interval to 60 seconds could save hundreds of network calls to the underlying storage account, which would be very useful.
Code is below
using System;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
namespace Microsoft.Health.Fhir.Ingest.Console.Template
{
public class CachingTemplateManager : ITemplateManager
{
private ITemplateManager _wrappedTemplateManager;
private IMemoryCache _templateCache;
public CachingTemplateManager(
ITemplateManager wrappedTemplateManager,
IMemoryCache cache)
{
_wrappedTemplateManager = wrappedTemplateManager;
_templateCache = cache;
}
public byte[] GetTemplate(string templateName)
{
var key = $"{templateName}Bytes";
return _templateCache.GetOrCreate(key,
e =>
{
e.SetAbsoluteExpiration(TimeSpan.FromMinutes(1));
return _wrappedTemplateManager.GetTemplate(templateName);
});
}
public string GetTemplateAsString(string templateName)
{
return _templateCache.GetOrCreate(templateName,
e =>
{
e.SetAbsoluteExpiration(TimeSpan.FromMinutes(1));
return _wrappedTemplateManager.GetTemplateAsString(templateName);
});
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Rob, great suggestion and something similar to this is what I was thinking we would transition to. I hadn't consider using a cache but that makes sense. I was thinking we would have a task that fires every so often (5 minutes?) and checks for changes and updates the templates if needed.
Apiece that I want to include was generating the full template, i.e. I think the TemplateManager should ultimately build and return the completed IContentTemplate. Otherwise each thread processing a partition is still building new IContentTemplates in memory, increasing our memory pressure and garbage collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it would make sense for the TemplateManager to build the completed template. We can always do a follow up PR to use caching or your suggested timer based approach. Both options reduce the number of network calls to the storage account as well as building the completed IContentTemplate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ultimately, we would want to use something like Blob Storage Events if possible. That way we can eliminate all unnecessary storage calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, since the only way to update mappings is through a provisioning operation, is there some way we can set an environment variable (maybe the template CONTENT-MD5 hash) to signal when we need to fetch an updated mapping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I understand the suggestion, it is to not refresh the template during execution at all, rather rely on the provisioning flow to trigger the update?
That is an approach we can take though it doesn't address how new templates are picked up in OSS deployments. It is a good idea and a good way for us improve this but we will need to have different configurable strategies we can inject for OSS vs managed service. We are looking at some improvements for how users can manage their templates, perhaps we can do this as part of that work stream.
For now I, I felt it was a pretty substantial departure from how we refresh templates today, so I wanted to be cautious and preserve the existing behavior for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just had a small suggestion on how we retrieve the template
await _retryPolicy.ExecuteAsync(async () => await ConsumeAsyncImpl(events)); | ||
} | ||
|
||
private async Task<IContentTemplate> GetNormalizationTemplate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had taken a stab at optimizing this in the past. My approach entailed wrapping the current TemplateManager
in a CachingTemplateManager
. And caching the results for X minutes. The reason I liked this approach is that it eliminates the majority of network requests to the storage account.
A cache miss could result in multiple threads (one per partition) attempting to fill the cache. But I felt that is ok as the multiple attempts would fill the cache with the same data. Locking could also be implemented inside of the cache population delegate if we wanted to eliminate that.
The big downside here is we go from instantly picking up customer changes to forcing a new read at regular intervals. So that change would need to be articulated to customers. But even setting the interval to 60 seconds could save hundreds of network calls to the underlying storage account, which would be very useful.
Code is below
using System;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
namespace Microsoft.Health.Fhir.Ingest.Console.Template
{
public class CachingTemplateManager : ITemplateManager
{
private ITemplateManager _wrappedTemplateManager;
private IMemoryCache _templateCache;
public CachingTemplateManager(
ITemplateManager wrappedTemplateManager,
IMemoryCache cache)
{
_wrappedTemplateManager = wrappedTemplateManager;
_templateCache = cache;
}
public byte[] GetTemplate(string templateName)
{
var key = $"{templateName}Bytes";
return _templateCache.GetOrCreate(key,
e =>
{
e.SetAbsoluteExpiration(TimeSpan.FromMinutes(1));
return _wrappedTemplateManager.GetTemplate(templateName);
});
}
public string GetTemplateAsString(string templateName)
{
return _templateCache.GetOrCreate(templateName,
e =>
{
e.SetAbsoluteExpiration(TimeSpan.FromMinutes(1));
return _wrappedTemplateManager.GetTemplateAsString(templateName);
});
}
}
}
{ | ||
semaphore.Release(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can this method (GetNormalizationTemplate()) and the semaphore be moved to the TemplateManager? Just curious if we can encapsulate all the logic that determines which template to use in a single class that we can update later if we want to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The template manager is also used for the FHIR mapping templates. I didn't want to impact that code significantly with this PR. But you are correct, this can be and should be handled by it's own class/responsibility. I am planning on addressing this in a follow up PR (but it won't be part of next week's release).
{ | ||
var template = await GetNormalizationTemplate(); | ||
|
||
var normalizationBatch = new List<(string sourcePartition, IMeasurement measurement)>(50); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the mechanism that limits the number of events
to 50?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't limit the list to 50, this is just initial capacity of the list (the internal array size that backs the list). If not set, the array size is 0 and then needs to be increased as elements are added. If more than 50 elements, the capacity will be expanded.
The intent is to set a reasonable starting capacity excessive new array allocations & array copies as the list grows. I believe the implementation of List doubles the capacity when it exceeds the limit.
See https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.list-1.-ctor?view=net-6.0#system-collections-generic-list-1-ctor(system-int32) for more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to find the source code here https://source.dot.net/#System.Private.CoreLib/List.cs,421, when growing it does double the underlying array length. If no capacity is set, when it first grows, the default array size is 4.
Updated approach to the large perf message improvements. Instead of streaming, keep existing batch service but update settings to lower thresholds (buffer size of 10 instead of 100 and wait time of 10 seconds instead of 30). This will reduce overhead while still allowing for batches on egress for the perf benefits.