Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beats panic if there are more than 32767 pipeline clients #38197

Closed
belimawr opened this issue Mar 6, 2024 · 5 comments · Fixed by #38556
Closed

Beats panic if there are more than 32767 pipeline clients #38197

belimawr opened this issue Mar 6, 2024 · 5 comments · Fixed by #38556
Assignees
Labels
bug Team:Elastic-Agent Label for the Agent team

Comments

@belimawr
Copy link
Contributor

belimawr commented Mar 6, 2024

This has been tested with Filebeat but the bug is on libbeat, so it's likely affecting all beats. The OS tested was Linux but the issue is not dependant on OS.

Description

When Filebeat, using the filestream input, (other inputs are likely affected as well, the log input is not affected) is configured to harvest more than 32767 files at once it will panic. This happens because for each file, two elements are added to a slice of channels by Pipeline.runSitnalPropagation, once this slice reaches 65536 elements, a reflect.Select on this slice will cause Filebeat to panic. The panic happens here:

chosen, recv, recvOK := reflect.Select(channels)
.

For every new client this infinite for loop adds two elements to the slice of channels

if client := recv.Interface().(*client); client != nil {
channels = append(channels,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.closeRef.Done()),
},
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.done),
},
)
clients = append(clients, client)
}
continue
}

Once the slice contains 65536 or more, then Filebeat panics with a message/stacktrace like this

panic: reflect.Select: too many cases (max 65536)
goroutine 277 [running]:
reflect.Select({0xc26e47e000?, 0xc360100000?, 0xc024488f10?})
reflect/value.go:2873 +0x79a
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).runSignalPropagation(0xc02f6b47b8?)
github.com/elastic/beats/v7/libbeat/publisher/pipeline/pipeline.go:330 +0x1d8
created by github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*Pipeline).registerSignalPropagation.func1
github.com/elastic/beats/v7/libbeat/publisher/pipeline/pipeline.go:315 +0x97

How to reproduce

The easiest way to reproduce this issue is to create 33000 small log files, then start Filebeat to harvest them. While testing I faced some issues/OS limitations when trying to have all files being constantly updated.

You can use anything to generate the files, I used the following shell script and flog

#!/bin/bash

for i in $(seq 1 $1)
do
    path=$(printf "/tmp/log%010d.log\n" $i)
    flog -s 1 -n 100 > $path
    echo $path
done

Save it as gen-logs.sh, make it executable and run ./gen-logs.sh 33000.

Use the file output for the simplicity and easy to validate all logs have been ingested. The output and logging configuration ensure there will be a single log file and single output file.

filebeat.inputs:
  - type: filestream
    id: my-unique-id-per-filebeat-process
    paths:
      - /tmp/log*.log
    #harvester_limit: 32000

output:
  file:
    enabled: true
    path: ${path.home}/output
    filename: out
    rotate_every_kb: 100000000
    rotate_on_startup: false

logging:
  level: debug
  selectors:
    - crawler
    - file_watcher
    - input
    - input.filestream
    - input.harvester
    - registrar
    - scanner
    - service
  files:
    rotateeverybytes: 10485760000
    rotateonstartup: false

Start Filebeat and wait until it panics.

If you uncomment the #harvester_limit: 32000 Filebeat will work without issues and ingest all files. If using the script provided, there should be 3300000 events in the output file, you can verify that with:

wc -l output/*

Workaround

One workaround is to set the harvester_limit to a number smaller than 32000 if using a single input. If using multiple affected inputs all inputs should have limits to the number of pipeline clients they create in a way that for a running Filebeat process there will never be more than 32000 pipeline clients running concurrently.

For the filestream input, here is an example configuration:

filebeat.inputs:
  - type: filestream
    id: my-unique-id-per-filebeat-process
    paths:
      - /tmp/log*.log
    harvester_limit: 32000
@belimawr belimawr added bug Team:Elastic-Agent Label for the Agent team labels Mar 6, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@cmacknz
Copy link
Member

cmacknz commented Mar 6, 2024

One workaround is to set the harvester_limit to a number smaller than 32000 if using a single input.

How is this a work around? Do the additional harvesters still execute after waiting for some undefined period of time? Or do they possibly never execute? If the latter, this seems more like converting an obvious failure into a silent failure.

Of the 32K active harvesters, how many of them are actually sending data concurrently? I wonder if it would be more effective to put the beat.Clients into something like a sync.Pool so that they are only actually kept around if they are used?

Is there a way to tell how many of the pipeline clients are idle and for how long?

@cmacknz
Copy link
Member

cmacknz commented Mar 6, 2024

Another work around at the input level would be to figure out a way to have multiple inputs harvest the set of files, essentially sharding the input work load.

We could also shard or change the structure of the select cases, it looks like we are only waiting on the done signals. We could just create an another select case once we go past the limit on how many a single select can handle:

if client := recv.Interface().(*client); client != nil {
channels = append(channels,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.closeRef.Done()),
},
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(client.done),
},
)

@belimawr
Copy link
Contributor Author

belimawr commented Mar 6, 2024

How is this a work around? Do the additional harvesters still execute after waiting for some undefined period of time? Or do they possibly never execute? If the latter, this seems more like converting an obvious failure into a silent failure.

The harvester_limit limits the number of concurrent harvesters, once the limit is reached new harvesters are not created until the old ones are closed.

Of the 32K active harvesters, how many of them are actually sending data concurrently?

On my tests, not many as the files were not being updated. Theoretically, all files could be live updated, which would also generate other issues like the not running harvesters starving. That is definitely a scale edge case we do not cover well.

I wonder if it would be more effective to put the beat.Clients into something like a sync.Pool so that they are only actually kept around if they are used?

I did not investigate the code to see how we could do it mitigate it. At least on Filestream there is an infinity loop that reads from the file and then publishes the line/event read. Those beat.Client contain some configuration for close and ACK handling, on the top of my head I don't know how much re-usable they are.

The affected inputs are the ones that call `pipeline.ConnectWith)

// ConnectWith create a new Client for publishing events to the pipeline.
// The client behavior on close and ACK handling can be configured by setting
// the appropriate fields in the passed ClientConfig.
// If not set otherwise the defaut publish mode is OutputChooses.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
and as argument they pass the configuration they want which includes a processors list 😢

type ClientConfig struct {
PublishMode PublishMode
Processing ProcessingConfig

type ProcessingConfig struct {
// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata mapstr.EventMetadata
// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta mapstr.M
// Fields provides additional 'global' fields to be added to every event
Fields mapstr.M
// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *mapstr.Pointer
// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList
// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool
// Disables the addition of host.name if it was enabled for the publisher.
DisableHost bool
// EventNormalization controls whether the event normalization processor
// is applied to events. If nil the Beat's default behavior prevails.
EventNormalization *bool
// Disables the addition of input.type
DisableType bool
// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
}

We probably can re-use the same beat.Client for all harvesters belonging to the same filestream input. Other inputs would have to implement their own solution.

Is there a way to tell how many of the pipeline clients are idle and for how long?

I did not look into this. On my test case I had 33000 files and 100 lines on each. So most of the havesters/clients were all idle for sure.

We could just create an another select case once we go past the limit on how many a single select can handle

That sounds like a pretty good idea! I can easily see that working.

@cmacknz
Copy link
Member

cmacknz commented Mar 6, 2024

We probably can re-use the same beat.Client for all harvesters belonging to the same filestream input.

We likely wouldn't want to do this because the Publish method of the client executes the input processors while holding a single mutex. In effect this means only one event can be published at a time, the beat.Client is not concurrent. So if you share clients you'll limit concurrency significantly.

We could just create an another select case once we go past the limit on how many a single select can handle

That sounds like a pretty good idea! I can easily see that working.

Agreed I think this is the idea to pursue first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants