Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Personal/duburson/remove sa prototype #68

Closed
wants to merge 5 commits into from

Conversation

dustinburson
Copy link
Member

Create a Prototype for processing IoMT events without Streaming Analytics

Leverages source code for Azure Function Event Hub extensions.

A majority of the changes are in the EventHubListner.cs & MeasurementFhirImportService.cs classes.

Additional improvements include:

  • Switching to latest Azure.Messaging.EventHubs SDK. Newer SDK has expanded options for controlling the identity used to connect to the Event Hub.
  • Ensure the time period is standardize to common period. For example if the timed flush is 1 minute it should be at the top of every minute, not an arbitrary starting point when processing starts. This will ensure repeatability and consistency across partitions
  • Events should be matched to the time period for repeatability. For example if you have 2,000 events and a 1 minute flush time, events should be read until they are past the 1 minute period (based on event enqueued time) and a batch is issued. This ensure repeatability in down stream processing if the pipeline is reseeded.
  • Ensure retries on failures. Right now an unhandled error will be ignored and not retried. This needs to be updated to keep retrying like with Stream Analytics.

Additional Ideas:

  • Memory consumption may be high using this model. It might be beneficial to introduce some compression.
  • Deserialize events directly into measurement groups/measurements and flush them to the function. This would make consumption in the functions easier and give us additional opportunities to reduce memory to just the objects we need but has additional challenges. Watermarking progress on the EventHub is done based on the sequence number and partition. If we went this route we would need to correlate the correct event data with the measurements so the watermark can be saved correctly. Going this route also reduces the utility of the trigger we create for other applications.
  • Use the Event Hub trigger to just track watermark changes and use additional methods like durable functions to process the events when we hit out batch size or timed flush. PHI isn't an issue because we are just transmitting the event metadata. In prior testing the spin up time for an Event Hub processor is pretty long ( a couple seconds) and not really suited for this. Would need to see if there is a lighter weight alternative to make this feasible. If so it would be a good option and reduce concerns about memory usage since we would only load messages when it is time to process them based on the watermarks.

@ms-teli
Copy link
Contributor

ms-teli commented Sep 22, 2020

In general, correct me if this was wrong, the implementation becomes relatively more like a periodic long running job, so assuming we're going to use AKS, we would need to deal with the complexity of managing the lifecycle of a Pod and lifecycle of a buffering events batch in terms of auto-scaling. For example, a function runtime is holding in a tumbling window while the orchestrator decide to terminate it. I think there is the mechanism to deal with this situation but it would require some more investigation.

@dustinburson
Copy link
Member Author

In general, correct me if this was wrong, the implementation becomes relatively more like a periodic long running job, so assuming we're going to use AKS, we would need to deal with the complexity of managing the lifecycle of a Pod and lifecycle of a buffering events batch in terms of auto-scaling. For example, a function runtime is holding in a tumbling window while the orchestrator decide to terminate it. I think there is the mechanism to deal with this situation but it would require some more investigation.

Kind of. If we are buffering but the application is terminated and restarted across multiple pods in a scale out or scale in scenario we still have the original watermark in Azure Storage for each partition so we should just resume where we left off. The data that was buffered will need to be re-read but should be buffered and then eventually flushed in the same manner. This is why it is important to have a deterministic buffering mechanism. The ability to deterministically replay is important not just for the reseed scenario but also critical for these failure/scaling scenarios.

This is also why the custom Azure Function trigger is necessary. Without it we don't control when the watermark is persisted in relation to the buffering. The code here is design to only update the watermark after the batch is complete. Hopefully that covers your concern. Let me know if you need more discussion!

return new EventProcessor(_options, _executor, _logger, _singleDispatch);
}

public IScaleMonitor GetMonitor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we auto-scale this new eventhubtrigger/eventhublistener across azure functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled by the hosting infrastructure. If we omit the same metrics as the current EventHubTrigger we should be able to leverage the same built in AKS scaling. The scaling itself isn't handled in the function, just the telemetry collected.

_logger = logger;
_pendingEventData = new ConcurrentDictionary<string, PartitionEventData>();
_flushTimeSpan = TimeSpan.FromMinutes(1);
_timedFlush = Task.Run(TimedFlush);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to support time-based flushing, won't the function app need to running? And if its running, won't it be incurring cost?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even without time based flushing the function is always running since the underlying Event Hub processor needs to poll for Events. It is probably to think of it better as a timed based buffer since in the normal mode events are immediately sent to the function (name in the code not withstanding). This instead waits to the specified time to send the batch to reduce the calls on the FHIR server. This very helpful if you are using the SampledData type to collect many data points in the same observation. If you are just using ValueQuantity or some other single use type it is less beneficial since each message will become an observation no matter what.

The scale to zero solution with KEDA works independently of this trigger. In a KEDA scenario, KEDA would monitor the event hub and then spin up the first instance of the function. At that point data would be buffered and sent to the FHIR service a regular intervals.

@namalu
Copy link
Member

namalu commented Sep 23, 2020

I'm wondering if there is an opportunity to remove the normalizeddata Event Hub and call the MeasurementCollectionToFhir function through a service deployed in K8s and watermark only after a 200 success (when Observation create/update is successful). I can see this being useful for a few of reasons:

  1. If/when a failure occurs, at any point in the conversion process, the checkpoint in the devicedata Event Hub does not advance. Currently, messages can get stuck in the normalizeddata Event Hub if the FHIR conversion fails for any reason. Looking at the message in/out statistics on the devicedata Event Hub makes it seem like there is no problem, and a user can end up wondering where the data is - "It's not in the Event Hub, and it's not in FHIR..."

  2. I think it will reduce cost. We shave off the cost of the Event Hub (it looks like an Event Hub sitting idle costs $0.21 per day), the cost associated with storage account checkpointing and, if the MeasurementCollectionToFhir is an HttpTrigger, it won't execute until there is work to do - My understanding is that when we use EventHubTriggers, they poll the Event Hub frequently to check for work which uses compute. I know we are talking pennies here, but it adds up.

  3. Less internal identity management. We still need to worry about the Function AzureWebJobsStorage, but currently, there is no official support for MI, so we will probably be using a connection string which we rotate every 90 days. Without SA and the normalizeddata Event Hub, will we still need an internal Managed Identity?

@dustinburson
Copy link
Member Author

dustinburson commented Sep 23, 2020

I'm wondering if there is an opportunity to remove the normalizeddata Event Hub and call the MeasurementCollectionToFhir function through a service deployed in K8s and watermark only after a 200 success (when Observation create/update is successful). I can see this being useful for a few of reasons:

  1. If/when a failure occurs, at any point in the conversion process, the checkpoint in the devicedata Event Hub does not advance. Currently, messages can get stuck in the normalizeddata Event Hub if the FHIR conversion fails for any reason. Looking at the message in/out statistics on the devicedata Event Hub makes it seem like there is no problem, and a user can end up wondering where the data is - "It's not in the Event Hub, and it's not in FHIR..."
  2. I think it will reduce cost. We shave off the cost of the Event Hub (it looks like an Event Hub sitting idle costs $0.21 per day), the cost associated with storage account checkpointing and, if the MeasurementCollectionToFhir is an HttpTrigger, it won't execute until there is work to do - My understanding is that when we use EventHubTriggers, they poll the Event Hub frequently to check for work which uses compute. I know we are talking pennies here, but it adds up.
  3. Less internal identity management. We still need to worry about the Function AzureWebJobsStorage, but currently, there is no official support for MI, so we will probably be using a connection string which we rotate every 90 days. Without SA and the normalizeddata Event Hub, will we still need an internal Managed Identity?

Thanks for the suggestion Nate. Unfortunately there are some major hurdles if we remove the normalized event hub. The biggest is we use the hop from device data to normalized data to partition the normalize data according to the device id. This ensures the data for a given device will land in the same partition and we don't have contention across multiple partitions for the same devices data. This isn't really something we can enforce on the device data event hub since the partitions for an event are set when a message is written and I would prefer to not leave such a critical item up to the customer to implement correctly.

Some additional potential problems with removing the normalized data Event Hub:

  • We don't control the partition count. By having a normalized event hub we can ensure a configuration that will work well
  • We can't scale differently. Due to projections there are often more normalized messages than device messages which would necessitate additional scaling.
  • If we have an error connecting to the FHIR server, we will be more (potentially repeated reads) on the customer's Event Hub. They might ask us why we are taking up all their hub bandwidth on egress.
  • It will make it hard for us to scale for additional features like forwarding normalized events to the customer. This would be best done with an additional consumer group and processing logic from that consumer group.

If we were to remove the normalized event hub, I don't think we need to keep the current HttpTrigger. It can just be a call to an internal method on another class. We don't really gain much by keeping it an HttpTrigger. We would still need to rotate function keys and transform the data into the http requests and deal with the overhead. Instead if we end up going this route I would suggest just an in memory pipeline with distinct stages.

On the cost front, the polling it self is minimal. This is also alleviated if we implement a scale to zero solution like KEDA. KEDA still polls but it more infrequent until data is detected then consumers are ramped up.

The concern about where the data is can be handled with proper telemetry and documentation. Also one clarifying point, data in the Event Hub isn't consumed like a queue. It will remain there until it ages out.

As for the identity management, we will still need the internal managed identity. The managed identity is used to access the key vault where the other secrets are kept like the store keys. Also, as part of this work I think we will need to write our own triggers to use the latest EventHub and Storage SDKs. I haven't confirmed but there may be an opportunity to leverage managed identity for accessing the storage account with this change.

@dustinburson dustinburson deleted the personal/duburson/remove-sa-prototype branch November 8, 2022 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants