-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathtest_logstash.py
70 lines (59 loc) · 2.2 KB
/
test_logstash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import json
import metricbeat
import os
import semver
import sys
import time
import unittest
import urllib.error
import urllib.parse
import urllib.request
class Test(metricbeat.BaseTest):
COMPOSE_SERVICES = ['logstash']
FIELDS = ['logstash']
@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
def test_node(self):
"""
logstash node metricset test
"""
unittest.skip('Skipping this test to check documented fields. We will unskip once we know which fields can be deleted')
return
self.check_metricset("logstash", "node", self.get_hosts(), self.FIELDS + ["process"])
@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
@unittest.skip("flaky test: https://github.com/elastic/beats/issues/26432")
def test_node_stats(self):
"""
logstash node_stats metricset test
"""
unittest.skip('Skipping this test to check documented fields. We will unskip once we know which fields can be deleted')
return
self.check_metricset("logstash", "node_stats", self.get_hosts(), self.FIELDS)
@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
@unittest.skip("flaky; see https://github.com/elastic/beats/issues/13947")
def test_xpack(self):
"""
logstash-xpack module tests
"""
version = self.get_version()
if semver.compare(version, "7.3.0") == -1:
# Skip for Logstash versions < 7.3.0 as necessary APIs not available
raise unittest.SkipTest
self.render_config_template(modules=[{
"name": "logstash",
"metricsets": ["node", "node_stats"],
"hosts": self.get_hosts(),
"period": "1s",
"extras": {
"xpack.enabled": "true"
}
}])
proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0)
proc.check_kill_and_wait()
self.assert_no_logged_warnings()
def get_version(self):
host = self.get_hosts()[0]
res = urllib.request.urlopen("http://" + host + "/").read()
body = json.loads(res)
version = body["version"]
return version