Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global Throttling Across Multiple Aggregators #18190

Open
bchen32 opened this issue Aug 9, 2023 · 8 comments
Open

Global Throttling Across Multiple Aggregators #18190

bchen32 opened this issue Aug 9, 2023 · 8 comments
Labels
domain: processing Anything related to processing Vector's events (parsing, merging, reducing, etc.) type: feature A value-adding code addition that introduce new functionality.

Comments

@bchen32
Copy link

bchen32 commented Aug 9, 2023

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

Context:
Currently building an observability pipeline with the unified architecture across several k8s clusters. Have a bunch of agents deployed as a DaemonSet, sending logs to some centralized aggregators fronted by a load balancer.

Problem:
I'm trying to implement the throttle transform, but realizing that it doesn't quite work as expected when the number of aggregator pods gets scaled up. The goal is to have logs throttled by app-name. However, the load balancer might spread out different agents from the same app to different aggregator pods, resulting in the throttle being higher than expected.

Example:
Let's say we have an app foo with 2 replicas across 2 nodes, and we have 2 aggregator pods. Throttle is configured to rate limit at 1 log per second per app. The load balancer decides that node 1 is sending logs to aggregator 1, and node 2 is sending logs to aggregator 2. So, aggregator 1 rate limits node 1 to 1 log per second, and aggregator 2 rate limits node 2 to 1 log per second. Therefore, the total output from the aggregators is 2 logs per second for foo, which is not what we want.

Attempted Solutions

One workaround would be to configure the load balancer to hash by app-name to always connect nodes with the same app-name to the same aggregator pod. The issue is that some apps may have hundreds of replicas and generate more logs than a single aggregator can handle.

Proposal

What seems like the natural solution is just for the aggregator pods to have some sort of global synchronization for throttling.

References

No response

Version

v0.31.0

@bchen32 bchen32 added the type: feature A value-adding code addition that introduce new functionality. label Aug 9, 2023
@bchen32 bchen32 changed the title Global Global Throttling Across Multiple Aggregators Aug 9, 2023
@jszwedko
Copy link
Member

Thanks for this request @bchen32 ! As you note, this will require sharing state across Vector instances, which is not something we have plans for in the near future as it greatly complicates the setup, but might be something we do. The workaround you identified of partitioning the the input along the fields you need to throttle is the usual one we recommend.

A related workaround would be to separate just that step of processing to its own set of Vector aggregators that only apply the throttling but do no other processing. This could help you manage their resources better.

@jszwedko jszwedko added the domain: processing Anything related to processing Vector's events (parsing, merging, reducing, etc.) label Aug 10, 2023
@bchen32
Copy link
Author

bchen32 commented Aug 10, 2023

@jszwedko That makes sense. In terms of partitioning the input, what's the recommended way to implement that? Currently using a vector sink -> vector source and it's not entirely clear to me how to even partition the logs by app-name, given that a single agent could be batching and sending logs from multiple apps at the same time.

@jszwedko
Copy link
Member

@jszwedko That makes sense. In terms of partitioning the input, what's the recommended way to implement that? Currently using a vector sink -> vector source and it's not entirely clear to me how to even partition the logs by app-name, given that a single agent could be batching and sending logs from multiple apps at the same time.

Ah, yeah, this recommendation is generally given when the partitioning is by client, in which case you can use sticky load balancing. If you have multiple clients that are sending events with the same app-name then this'll be trickier. I think what we would need/want to do is to have the http sink be able to set HTTP headers based on event data and then you could use those headers to do the routing. Do you think that would work for you? The http sink can be used with the http_server source to send events between two Vector instances.

@bchen32
Copy link
Author

bchen32 commented Aug 15, 2023

@jszwedko That makes sense. In terms of partitioning the input, what's the recommended way to implement that? Currently using a vector sink -> vector source and it's not entirely clear to me how to even partition the logs by app-name, given that a single agent could be batching and sending logs from multiple apps at the same time.

Ah, yeah, this recommendation is generally given when the partitioning is by client, in which case you can use sticky load balancing. If you have multiple clients that are sending events with the same app-name then this'll be trickier. I think what we would need/want to do is to have the http sink be able to set HTTP headers based on event data and then you could use those headers to do the routing. Do you think that would work for you? The http sink can be used with the http_server source to send events between two Vector instances.

That makes sense, we did consider that option. From what I can understand though, each http request is a batch of multiple logs. So you can't guarantee that all logs in a request are even from the same app-name, which makes it difficult to use that as a header. I guess you could limit the batch max_events to 1, but I'm assuming that might come with some negative performance/network traffic implications.

@jszwedko
Copy link
Member

@jszwedko That makes sense. In terms of partitioning the input, what's the recommended way to implement that? Currently using a vector sink -> vector source and it's not entirely clear to me how to even partition the logs by app-name, given that a single agent could be batching and sending logs from multiple apps at the same time.

Ah, yeah, this recommendation is generally given when the partitioning is by client, in which case you can use sticky load balancing. If you have multiple clients that are sending events with the same app-name then this'll be trickier. I think what we would need/want to do is to have the http sink be able to set HTTP headers based on event data and then you could use those headers to do the routing. Do you think that would work for you? The http sink can be used with the http_server source to send events between two Vector instances.

That makes sense, we did consider that option. From what I can understand though, each http request is a batch of multiple logs. So you can't guarantee that all logs in a request are even from the same app-name, which makes it difficult to use that as a header. I guess you could limit the batch max_events to 1, but I'm assuming that might come with some negative performance/network traffic implications.

Ah, actually, I meant to mention that the header would be involved in batch partitioning too. That is, if we allow dynamic headers to be set, that the http sink would batch together events that have the same header value before sending.

@bchen32
Copy link
Author

bchen32 commented Aug 16, 2023

@jszwedko That makes sense. In terms of partitioning the input, what's the recommended way to implement that? Currently using a vector sink -> vector source and it's not entirely clear to me how to even partition the logs by app-name, given that a single agent could be batching and sending logs from multiple apps at the same time.

Ah, yeah, this recommendation is generally given when the partitioning is by client, in which case you can use sticky load balancing. If you have multiple clients that are sending events with the same app-name then this'll be trickier. I think what we would need/want to do is to have the http sink be able to set HTTP headers based on event data and then you could use those headers to do the routing. Do you think that would work for you? The http sink can be used with the http_server source to send events between two Vector instances.

That makes sense, we did consider that option. From what I can understand though, each http request is a batch of multiple logs. So you can't guarantee that all logs in a request are even from the same app-name, which makes it difficult to use that as a header. I guess you could limit the batch max_events to 1, but I'm assuming that might come with some negative performance/network traffic implications.

Ah, actually, I meant to mention that the header would be involved in batch partitioning too. That is, if we allow dynamic headers to be set, that the http sink would batch together events that have the same header value before sending.

Oh interesting, does the http sink currently support dynamic headers?

@jszwedko
Copy link
Member

Oh interesting, does the http sink currently support dynamic headers?

Not yet unfortunately. That is being tracked by #201

@srstrickland
Copy link
Contributor

Just thought I'd chime in here because I would find this extremely useful. There are a bunch of workarounds we can employ, like:

  • making sure everything from the same key lands on the same host, but then this has potential skew implementations
  • rate limiting at a service mesh level, but this is mostly for limiting the number of requests, not the number of individual records. we could also potentially throw in a max content-length so the combination of max-requests and max-request-size would get us in the ballpark, but still somewhat suboptimal.

Istio (the service mesh we use in the cluster running vector components) has an compelling solution they call a Global Rate Limit. The short version is that it defers rate limiting decisions to a gRPC call with a specific API (they even provide a reference implementation that uses Redis). This approach could be appropriate here, as it defers the complexity to an external service, and could be tailored to high-throughput scenarios by making the gRPC calls on configurable batches of data rather than on every message (e.g. canAfford(key, numRecords)).

I understand that despite the apparent simplicity, this could still be a big lift, but it would provide huge value, particularly because:

  1. We autoscale many vector deployments based on many factors using KEDA
  2. Just accepting a "fudge factor" (e.g. rate limit is 1/N) relies on data being evenly balanced
  3. Sending all data for a particular key can lead to large skew

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: processing Anything related to processing Vector's events (parsing, merging, reducing, etc.) type: feature A value-adding code addition that introduce new functionality.
Projects
None yet
Development

No branches or pull requests

3 participants