Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP PubSub Input Performance #35029

Closed
1 of 2 tasks
kcreddy opened this issue Apr 5, 2023 · 6 comments
Closed
1 of 2 tasks

GCP PubSub Input Performance #35029

kcreddy opened this issue Apr 5, 2023 · 6 comments
Assignees
Labels
enhancement Filebeat Filebeat Team:Security-Service Integrations Security Service Integrations Team

Comments

@kcreddy
Copy link
Contributor

kcreddy commented Apr 5, 2023

GCP Pubsub input has certain bottlenecks which needs to be addressed:

  • Saves CPU time while publishing the events to the internal queue by disabling EventNormalization.
  • Test if creating multiple beat pipeline clients improves performance.
    • There is a single beat.Client for the input instance. The input creates many goroutines to read pubusb messages. But the beat.Client becomes a bottleneck because the Publish() call acquires a lock.
    • Need to test the effect of having multiple beat.Clients in a pool for each pubsub input instance (like one client per configured num_goroutines). This is similar change to AWS-S3 input which massively increased input performance.
    • Use multiple pub/sub subscriptions (parallel calls to Receive()) each with their own beat.Client
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

@kcreddy kcreddy changed the title GCP PubSub Input Performance Enhancements GCP PubSub Input Performance Apr 19, 2023
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@kcreddy
Copy link
Contributor Author

kcreddy commented Apr 17, 2024

Closed #37657 which was aimed at creating multiple pubsub clients rather than beat pipeline clients. Having multiple beat pipeline clients helps in reducing lock contention, but as seen in the attached mutex profiles in the PR, multiple pubsub clients doesn't really reduce it.

The solution we need is similar to AWS S3's SQS event processor:

// Create a pipeline client scoped to this goroutine.
client, err := p.pipeline.ConnectWith(beat.ClientConfig{

Here, the S3 input creates 1 pipeline client for each SQS message to process all S3 events within that SQS message.
But in this case GCP Pubsub input must create a Sync Pool for maintaining multiple pipeline clients, and each pipeline client must process a pubsub message. Although there would be CPU cost associated with this pool for fetching and releasing the clients, it might help reduce lock contention and the benefits and drawbacks should be tested.

@kcreddy
Copy link
Contributor Author

kcreddy commented Jun 24, 2024

Hey @andrewkroh,

I tried 2 variations which we discussed:

  1. adding pipeline clients inside an array
  2. using sync.Pool.

As pet the results below, there is not any performance improvement observed in both variations.

Both variations are taken from this base commit which is close to v8.14.0 tag: 4c4b2f8
The base filebeat version I used to compare is v8.14.0 tag (which has minor changes only related to docs and CI compared to the commit 4c4b2f8)

Variation 1: adding pipeline clients inside an array

v8.14.0...kcreddy:beats:variation1-array: 2 files changed: x-pack/filebeat/input/gcppubsub/config.go and x-pack/filebeat/input/gcppubsub/input.go

  1. Create configurable number of Outleter (pipeline clients) and store them in an array.
  2. Choose a pipeline client randomly from the array during sub.Receive and use it to publish messages.
    PR: x-pack/filebeat/input/gcppubsub: Add configurable outlets #39999

Results:
default -> subscription.num_goroutines: 1, subscription.max_outstanding_messages: 1600
ocl -> outlet.num_pipelines (pipeline clients) config setting.
seed -> including a seed time.Now().UnixNano() (inside PR).
g2 -> subscription.num_goroutines: 2
m2000 -> subscription.max_outstanding_messages: 2000
r30s -> Test ran for 30 seconds

  1. Events Per 30 seconds, measured from Non-zero metrics log.
Run 8.14.0 8.14.1 (2 ocl) 8.14.1 (5 ocl) 8.14.1 (5 ocl+seed)
default-r30s 86k 90k 88k 89k
g2m2000r30s 186k 197k 170k 177k
  1. Contentions: from running pprof command: go tool pprof -seconds 30 http://localhost:5066/debug/pprof/mutex
    pprof.filebeat.contentions.delay.001.8.14.0-default-r30s.pb.gz
    pprof.filebeat.contentions.delay.001-8.14.0-g2m2000-r30s.pb.gz
    pprof.filebeat.contentions.delay.001-8.14.1-default-ocl5-seed-r30s.pb.gz
    pprof.filebeat.contentions.delay.001-8.14.1-g2m2000-ocl5-seed-r30s.pb.gz

Variation 2: using sync.Pool.

v8.14.0...kcreddy:beats:variation2-syncpool: 1 file changed: x-pack/filebeat/input/gcppubsub/input.go

  1. Create a sync.Pool that returns an Outleter (pipeline client)
  2. In sub.Receive, use pool.Get() and use it to publish messages and put it back using pool.Put().
    PR: x-pack/filebeat/input/gcppubsub: Add syncpool on outlet #39998

Results:
default -> subscription.num_goroutines: 1, subscription.max_outstanding_messages: 1600
g2 -> subscription.num_goroutines: 2
m2000 -> subscription.max_outstanding_messages: 2000
r30s -> Test ran for 30 seconds

  1. Events Per 30 seconds, measured from Non-zero metrics log.
Run 8.14.0 8.14.1
default-r30s 86k 86k
g2m2000r30s 186k 165k
  1. Contentions: from running pprof command: go tool pprof -seconds 30 http://localhost:5066/debug/pprof/mutex
    pprof.filebeat.contentions.delay.001.8.14.0-default-r30s.pb.gz
    pprof.filebeat.contentions.delay.001-8.14.0-g2m2000-r30s.pb.gz
    pprof.filebeat.contentions.delay.001.8.14.1-default-syncpool-r30s.pb.gz
    pprof.filebeat.contentions.delay.001.8.14.1-g2m2000-syncpool-r30s.pb.gz

@andrewkroh
Copy link
Member

The contention for the lock within the beat.Client Publish() will be most noticeable when there is some edge-processing being used (like dissecting a message in the Beat). For our integrations we have shifted most processing out of this area where the lock is being held and into Elasticsearch ingest node. The exceptions are for the default processors that are always present in Agent.

If there is little edge processing then optimizing to avoid contention on the publish lock probably isn't worthwhile. The sync.Pool experiment is showing that. And if we did pursue this we would need to use something other than sync.Pool to provide a pool of clients because we would want some upper-bound on the number of clients and we need to be able to Close() the clients.

Let's verify that we can achieve 20k EPS with no code changes, and record the settings we used. We can refer to those in the future if we are doing tuning.

@kcreddy
Copy link
Contributor Author

kcreddy commented Jun 30, 2024

I have run few more tests on existing filebeat (no code changes) to check if we can reach much higher throughput just by tuning the existing settings. The follow tests are run with output.file configuration, with no changes dedfault internal queue settings.

default -> subscription.num_goroutines: 1, subscription.max_outstanding_messages: 1600
g -> subscription.num_goroutines. Example: g2 => subscription.num_goroutines: 2
m -> subscription.max_outstanding_messages. Example: m2000 => subscription.max_outstanding_messages: 2000
r30s -> Test ran for 30 seconds
Run 8.14.0 (current GA) EPS
default-r30s 86k 3k
g2m2000r30s 186k 6k
g5m2000r30s 408k 14k
g5m6000r30s 580k 19k
g10m2000r30s 496k 17k
g10m6000r30s 630k 21k

So, just by tuning subscription.num_goroutines and subscription.max_outstanding_messages, we can go more than 20k events per second and beyond.

Due to lack of throughput improvement, only optimising for contention isn't worth it to pursue this investigation to add multiple outlet (pipeline) clients further.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Filebeat Filebeat Team:Security-Service Integrations Security Service Integrations Team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants