Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of autodiscover #28524

Closed
wants to merge 6 commits into from

Conversation

vjsamuel
Copy link
Contributor

@vjsamuel vjsamuel commented Oct 19, 2021

Enhancement

What does this PR do?

This PR does the following:

  • Add workers to the kubernetes watcher to do parallel processing of pod objects
  • Fix the workqueue implementation as it was putting in whole objects. The workqueue is supposed to dedupe same objects that are already in the queue.
  • Make autodiscover accumulate updates to the reloader. Each time beats starts up, if there are a large number of monitorable entities, each one of the configs calls reload on all the configs which took 20 minutes to process on some of our large clusters
  • Change the cfgfile.Hash to use xxHash which is 9 times faster than the default used by hashstructure.

Why is it important?

On startup the creation of metricbeat/filebeat modules can take too long without these changes.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Oct 19, 2021
@mergify
Copy link
Contributor

mergify bot commented Oct 19, 2021

This pull request does not have a backport label. Could you fix it @vjsamuel? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v./d./d./d is the label to automatically backport to the 7./d branch. /d is the digit

NOTE: backport-skip has been added to this pull request.

@mergify mergify bot added the backport-skip Skip notification from the automated backport with mergify label Oct 19, 2021
@elasticmachine
Copy link
Collaborator

elasticmachine commented Oct 19, 2021

💔 Tests Failed

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2021-10-19T17:01:59.880+0000

  • Duration: 78 min 36 sec

  • Commit: 7ca0822

Test stats 🧪

Test Results
Failed 5
Passed 20545
Skipped 1467
Total 22017

Test errors 5

Expand to view the tests failures

Build&Test / heartbeat-pythonIntegTest / test_docker – heartbeat.tests.system.test_autodiscovery.TestAutodiscover
    Expand to view the error details

     beat.beat.TimeoutError: Timeout waiting for 'cond' to be true. Waited 10 seconds. 
    

    Expand to view the stacktrace

     self = <test_autodiscovery.TestAutodiscover testMethod=test_docker>
    
        @unittest.skipIf(not INTEGRATION_TESTS or
                         os.getenv("TESTING_ENVIRONMENT") == "2x",
                         "integration test not available on 2.x")
        def test_docker(self):
            """
            Test docker autodiscover starts modules from templates
            """
            import docker
            docker_client = docker.from_env()
        
            self.render_config_template(
                autodiscover={
                    'docker': {
                        'templates': '''
                          - condition:
                              contains.docker.container.image: redis
                            config:
                              - type: tcp
                                id: myid
                                hosts: ["${data.host}:${data.port}"]
                                schedule: "@every 1s"
                                timeout: 1s
                        ''',
                    },
                },
            )
        
            proc = self.start_beat()
        
            self.wait_until(lambda: self.log_contains(
                re.compile('autodiscover.+Got a start event', re.I)))
        
    >       self.wait_until(lambda: self.output_count(lambda x: x >= 1))
    
    tests/system/test_autodiscovery.py:45: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_autodiscovery.TestAutodiscover testMethod=test_docker>
    cond = <function TestAutodiscover.test_docker.<locals>.<lambda> at 0x7faa30e61ca0>
    max_timeout = 10, poll_interval = 0.1, name = 'cond'
    
        def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
            """
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
    >               raise TimeoutError("Timeout waiting for '{}' to be true. ".format(name) +
                                       "Waited {} seconds.".format(max_timeout))
    E               beat.beat.TimeoutError: Timeout waiting for 'cond' to be true. Waited 10 seconds.
    
    ../libbeat/tests/system/beat/beat.py:362: TimeoutError 
    

Build&Test / libbeat-unitTest / TestAutodiscover – github.com/elastic/beats/v7/libbeat/autodiscover
    Expand to view the error details

     Failed 
    

    Expand to view the stacktrace

     === RUN   TestAutodiscover
        autodiscover_test.go:518: Waiting for condition
    --- FAIL: TestAutodiscover (55.33s)
     
    

Build&Test / libbeat-goIntegTest / TestAutodiscover – github.com/elastic/beats/v7/libbeat/autodiscover
    Expand to view the error details

     Failed 
    

    Expand to view the stacktrace

     === RUN   TestAutodiscover
        autodiscover_test.go:518: Waiting for condition
    --- FAIL: TestAutodiscover (55.34s)
     
    

Build&Test / filebeat-pythonIntegTest / test_default_settings – filebeat.tests.system.test_autodiscover.TestAutodiscover
    Expand to view the error details

     beat.beat.TimeoutError: Timeout waiting for 'cond' to be true. Waited 10 seconds. 
    

    Expand to view the stacktrace

     self = <test_autodiscover.TestAutodiscover testMethod=test_default_settings>
    
        @unittest.skipIf(not INTEGRATION_TESTS or
                         os.getenv("TESTING_ENVIRONMENT") == "2x",
                         "integration test not available on 2.x")
        def test_default_settings(self):
            """
            Test docker autodiscover default config settings
            """
            with self.container_running() as container:
                self.render_config_template(
                    inputs=False,
                    autodiscover={
                        'docker': {
                            'cleanup_timeout': '0s',
                            'hints.enabled': 'true',
                            'hints.default_config': '''
                              type: log
                              paths:
                                - %s/${data.container.name}.log
                            ''' % self.working_dir,
                        },
                    },
                )
                proc = self.start_beat()
    >           self._test(container)
    
    tests/system/test_autodiscover.py:68: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_autodiscover.py:77: in _test
        self.wait_until(lambda: self.log_contains('Starting runner: input'))
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_autodiscover.TestAutodiscover testMethod=test_default_settings>
    cond = <function TestAutodiscover._test.<locals>.<lambda> at 0x7f357b1d5ee0>
    max_timeout = 10, poll_interval = 0.1, name = 'cond'
    
        def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
            """
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
    >               raise TimeoutError("Timeout waiting for '{}' to be true. ".format(name) +
                                       "Waited {} seconds.".format(max_timeout))
    E               beat.beat.TimeoutError: Timeout waiting for 'cond' to be true. Waited 10 seconds.
    
    ../libbeat/tests/system/beat/beat.py:362: TimeoutError 
    

Build&Test / filebeat-pythonIntegTest / test_docker – filebeat.tests.system.test_autodiscover.TestAutodiscover
    Expand to view the error details

     beat.beat.TimeoutError: Timeout waiting for 'cond' to be true. Waited 10 seconds. 
    

    Expand to view the stacktrace

     self = <test_autodiscover.TestAutodiscover testMethod=test_docker>
    
        @unittest.skipIf(not INTEGRATION_TESTS or
                         os.getenv("TESTING_ENVIRONMENT") == "2x",
                         "integration test not available on 2.x")
        def test_docker(self):
            """
            Test docker autodiscover starts input
            """
            with self.container_running() as container:
                self.render_config_template(
                    inputs=False,
                    autodiscover={
                        'docker': {
                            'cleanup_timeout': '0s',
                            'templates': f'''
                              - condition:
                                    equals.docker.container.name: {container.name}
                                config:
                                  - type: log
                                    paths:
                                      - %s/${{data.docker.container.name}}.log
                            ''' % self.working_dir,
                        },
                    },
                )
        
                proc = self.start_beat()
    >           self._test(container)
    
    tests/system/test_autodiscover.py:40: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_autodiscover.py:77: in _test
        self.wait_until(lambda: self.log_contains('Starting runner: input'))
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_autodiscover.TestAutodiscover testMethod=test_docker>
    cond = <function TestAutodiscover._test.<locals>.<lambda> at 0x7f357b1613a0>
    max_timeout = 10, poll_interval = 0.1, name = 'cond'
    
        def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
            """
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
    >               raise TimeoutError("Timeout waiting for '{}' to be true. ".format(name) +
                                       "Waited {} seconds.".format(max_timeout))
    E               beat.beat.TimeoutError: Timeout waiting for 'cond' to be true. Waited 10 seconds.
    
    ../libbeat/tests/system/beat/beat.py:362: TimeoutError 
    

Steps errors 15

Expand to view the steps failures

Show only the first 10 steps failures

heartbeat-pythonIntegTest - mage pythonIntegTest
  • Took 2 min 35 sec . View more details here
  • Description: mage pythonIntegTest
libbeat-unitTest - mage build unitTest
  • Took 3 min 23 sec . View more details here
  • Description: mage build unitTest
libbeat-unitTest - mage build unitTest
  • Took 2 min 8 sec . View more details here
  • Description: mage build unitTest
libbeat-unitTest - mage build unitTest
  • Took 2 min 5 sec . View more details here
  • Description: mage build unitTest
gsutil -m -q cp -a public-read build/system-tests-*.tar.gz gs://beats-ci-temp/Beats/beats/PR-28524-4
  • Took 0 min 2 sec . View more details here
  • Description: gsutil -m -q cp -a public-read build/system-tests-*.tar.gz gs://beats-ci-temp/Beats/beats/PR-28524-4
libbeat-goIntegTest - mage goIntegTest
  • Took 8 min 41 sec . View more details here
  • Description: mage goIntegTest
libbeat-goIntegTest - mage goIntegTest
  • Took 4 min 4 sec . View more details here
  • Description: mage goIntegTest
libbeat-goIntegTest - mage goIntegTest
  • Took 4 min 56 sec . View more details here
  • Description: mage goIntegTest
gsutil -m -q cp -a public-read build/system-tests-*.tar.gz gs://beats-ci-temp/Beats/beats/PR-28524-4
  • Took 0 min 2 sec . View more details here
  • Description: gsutil -m -q cp -a public-read build/system-tests-*.tar.gz gs://beats-ci-temp/Beats/beats/PR-28524-4
Error signal
  • Took 0 min 0 sec . View more details here
  • Description: Error 'hudson.AbortException: script returned exit code 1'

🐛 Flaky test report

❕ There are test failures but not known flaky tests.

Expand to view the summary

Genuine test errors 5

💔 There are test failures but not known flaky tests, most likely a genuine test failure.

  • Name: Build&Test / heartbeat-pythonIntegTest / test_docker – heartbeat.tests.system.test_autodiscovery.TestAutodiscover
  • Name: Build&Test / libbeat-unitTest / TestAutodiscover – github.com/elastic/beats/v7/libbeat/autodiscover
  • Name: Build&Test / libbeat-goIntegTest / TestAutodiscover – github.com/elastic/beats/v7/libbeat/autodiscover
  • Name: Build&Test / filebeat-pythonIntegTest / test_default_settings – filebeat.tests.system.test_autodiscover.TestAutodiscover
  • Name: Build&Test / filebeat-pythonIntegTest / test_docker – filebeat.tests.system.test_autodiscover.TestAutodiscover

🤖 GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

@jsoriano
Copy link
Member

/test

@jsoriano
Copy link
Member

/package

@jsoriano jsoriano added the Team:Integrations Label for the Integrations team label Oct 19, 2021
@jsoriano jsoriano requested a review from ChrsMark October 19, 2021 17:01
@elasticmachine
Copy link
Collaborator

Pinging @elastic/integrations (Team:Integrations)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Oct 19, 2021
Copy link
Member

@jsoriano jsoriano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks @vjsamuel!

return
}
// accumulate updates for every event that comes in so that we dont overwhelm
// the reloading engine with too many updates
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea for high loads 👍 but maybe 10 seconds is too much wait for normal use cases?

Maybe an option is to look for an algorithm that waits till a maximum of 10 (or more seconds) if there is a continuous stream of events, but continues earlier if there are isolated events. It could be basically the same as now, but with a case branch with a timer that is reset every time an event arrives (or a time.After()).

var burstPeriod = 1 * time.Second
...
burstTicker := time.NewTicker(burstPeriod)
...
for {
  burstTicker.Reset(burstPeriod)
  select{
  case event ...:
     ...
  case <- ticker.C:
    doBreak = true
  case <- burstTicker.C:
    doBreak = true
  }
  ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just my 2 cents, should hash result be cached? the reason for this ask is in autodiscover flow, config hash is calculated twice, firstly during processing, then in reload, and reload actually reloads all the configs that are passed to the reload function, in autodiscover case, it's all the "discovered" configs, which can be a huge list of configs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsoriano this is related to the other one you were working on at #29048, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, #29048 can certainly reduce the number of configurations reloaded, but it doesn't do anything to reuse hash calculations. It can be a good idea to calculate them only once for each config, but I think this can be done as a different change.

go wait.Until(func() {
for w.process(w.ctx) {
}
}, time.Second*1, w.ctx.Done())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this have the risk of out of order processing updates and deletes of the same resource? This could have the risk of updating, and then adding again a resource that has been previously deleted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjsamuel any follow-up on this?

@jsoriano
Copy link
Member

@vjsamuel several tests related to autodiscover are failing, could you take a look?

@mergify
Copy link
Contributor

mergify bot commented Nov 2, 2021

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b add_watch_worker upstream/add_watch_worker
git merge upstream/master
git push upstream add_watch_worker

@ChrsMark
Copy link
Member

Thank you for pushing these @vjsamuel! Any update about the status of this PR?

@mtojek
Copy link
Contributor

mtojek commented Jan 28, 2022

No updates since 2 months, conflicts in PR, closing.

@mtojek mtojek closed this Jan 28, 2022
@jsoriano
Copy link
Member

Ok to closing, but I think it would be worth looking at the ideas here, specially accumulating events can help a lot on startup on big clusters.

@ChrsMark can you keep track of this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip Skip notification from the automated backport with mergify Team:Integrations Label for the Integrations team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants