-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change telemetry devices to rely on jvm.config instead of ES_JAVA_OPTS #711
Changes from 7 commits
32cdd67
3778aaf
9777aa5
35ec77d
8fad702
59e63b0
7b11bb1
1c82eee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,8 +23,8 @@ | |
import jinja2 | ||
|
||
from esrally import exceptions | ||
from esrally.mechanic import team, java_resolver | ||
from esrally.utils import io, process, versions | ||
from esrally.mechanic import team, java_resolver, telemetry | ||
from esrally.utils import io, process, versions, jvm | ||
|
||
|
||
def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id): | ||
|
@@ -38,11 +38,22 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_ | |
node_root_dir = "%s/%s" % (target_root, node_name) | ||
|
||
_, java_home = java_resolver.java_home(car, cfg) | ||
|
||
node_telemetry_dir = os.path.join(node_root_dir, "telemetry") | ||
java_major_version, java_home = java_resolver.java_home(car, cfg) | ||
enabled_devices = cfg.opts("mechanic", "telemetry.devices") | ||
telemetry_params = cfg.opts("mechanic", "telemetry.params") | ||
node_telemetry = [ | ||
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), | ||
telemetry.JitCompiler(node_telemetry_dir), | ||
telemetry.Gc(node_telemetry_dir, java_major_version) | ||
] | ||
t = telemetry.Telemetry(enabled_devices, devices=node_telemetry) | ||
|
||
es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port) | ||
plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins] | ||
|
||
return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version) | ||
return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, t, distribution_version=distribution_version) | ||
|
||
|
||
def no_op_provisioner(): | ||
|
@@ -147,25 +158,27 @@ class BareProvisioner: | |
of the benchmark candidate to the appropriate place. | ||
""" | ||
|
||
def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, distribution_version=None, apply_config=_apply_config): | ||
def __init__(self, cluster_settings, es_installer, plugin_installers, preserve, telemetry=None, distribution_version=None, apply_config=_apply_config): | ||
self.preserve = preserve | ||
self._cluster_settings = cluster_settings | ||
self.es_installer = es_installer | ||
self.plugin_installers = plugin_installers | ||
self.distribution_version = distribution_version | ||
self.apply_config = apply_config | ||
self.telemetry = telemetry | ||
self.logger = logging.getLogger(__name__) | ||
|
||
def prepare(self, binary): | ||
if not self.preserve: | ||
logging.getLogger(__name__).info("Rally will delete the benchmark candidate after the benchmark") | ||
self.logger.info("Rally will delete the benchmark candidate after the benchmark") | ||
self.es_installer.install(binary["elasticsearch"]) | ||
# we need to immediately delete it as plugins may copy their configuration during installation. | ||
self.es_installer.delete_pre_bundled_configuration() | ||
|
||
# determine after installation because some variables will depend on the install directory | ||
target_root_path = self.es_installer.es_home_path | ||
provisioner_vars = self._provisioner_variables() | ||
|
||
for p in self.es_installer.config_source_paths: | ||
self.apply_config(p, target_root_path, provisioner_vars) | ||
|
||
|
@@ -185,6 +198,20 @@ def prepare(self, binary): | |
|
||
def cleanup(self): | ||
self.es_installer.cleanup(self.preserve) | ||
|
||
def _prepare_java_opts(self): | ||
java_opts = [] | ||
if self.telemetry is not None: | ||
java_opts.extend(self.telemetry.instrument_candidate_java_opts(self.es_installer.car, self.es_installer.node_name)) | ||
|
||
exit_on_oome_flag = "-XX:+ExitOnOutOfMemoryError" | ||
if jvm.supports_option(self.es_installer.java_home, exit_on_oome_flag): | ||
self.logger.info("Setting [%s] to detect out of memory errors during the benchmark.", exit_on_oome_flag) | ||
java_opts.append(exit_on_oome_flag) | ||
else: | ||
self.logger.info("JVM does not support [%s]. A JDK upgrade is recommended.", exit_on_oome_flag) | ||
|
||
return java_opts | ||
|
||
def _provisioner_variables(self): | ||
plugin_variables = {} | ||
|
@@ -217,6 +244,12 @@ def _provisioner_variables(self): | |
provisioner_vars.update(self.es_installer.variables) | ||
provisioner_vars.update(plugin_variables) | ||
provisioner_vars["cluster_settings"] = cluster_settings | ||
|
||
java_opts = self._prepare_java_opts() | ||
if java_opts: | ||
provisioner_vars["additional_java_settings"] = java_opts | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just 1 empty line is enough here. |
||
|
||
return provisioner_vars | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,17 +46,13 @@ def __init__(self, enabled_devices=None, devices=None): | |
self.enabled_devices = enabled_devices | ||
self.devices = devices | ||
|
||
def instrument_candidate_env(self, car, candidate_id): | ||
opts = {} | ||
def instrument_candidate_java_opts(self, car, candidate_id): | ||
opts = [] | ||
for device in self.devices: | ||
if self._enabled(device): | ||
additional_opts = device.instrument_env(car, candidate_id) | ||
additional_opts = device.instrument_java_opts(car, candidate_id) | ||
# properly merge values with the same key | ||
for k, v in additional_opts.items(): | ||
if k in opts: | ||
opts[k] = "%s %s" % (opts[k], v) | ||
else: | ||
opts[k] = v | ||
opts.extend(additional_opts) | ||
return opts | ||
|
||
def attach_to_cluster(self, cluster): | ||
|
@@ -108,7 +104,7 @@ class TelemetryDevice: | |
def __init__(self): | ||
self.logger = logging.getLogger(__name__) | ||
|
||
def instrument_env(self, car, candidate_id): | ||
def instrument_java_opts(self, car, candidate_id): | ||
return {} | ||
|
||
def attach_to_cluster(self, cluster): | ||
|
@@ -169,7 +165,7 @@ def __init__(self, telemetry_params, log_root, java_major_version): | |
self.log_root = log_root | ||
self.java_major_version = java_major_version | ||
|
||
def instrument_env(self, car, candidate_id): | ||
def instrument_java_opts(self, car, candidate_id): | ||
io.ensure_dir(self.log_root) | ||
log_file = "%s/%s-%s.jfr" % (self.log_root, car.safe_name, candidate_id) | ||
|
||
|
@@ -190,31 +186,32 @@ def instrument_env(self, car, candidate_id): | |
java_opts = self.java_opts(log_file) | ||
|
||
self.logger.info("jfr: Adding JVM arguments: [%s].", java_opts) | ||
return {"ES_JAVA_OPTS": java_opts} | ||
return java_opts | ||
|
||
def java_opts(self, log_file): | ||
recording_template = self.telemetry_params.get("recording-template") | ||
java_opts = "-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints " | ||
|
||
java_opts = ["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints"] | ||
jfr_cmd = "" | ||
if self.java_major_version < 11: | ||
java_opts += "-XX:+UnlockCommercialFeatures " | ||
java_opts.append("-XX:+UnlockCommercialFeatures") | ||
|
||
if self.java_major_version < 9: | ||
java_opts += "-XX:+FlightRecorder " | ||
java_opts += "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={} ".format(log_file) | ||
java_opts += "-XX:StartFlightRecording=defaultrecording=true" | ||
java_opts.append("-XX:+FlightRecorder") | ||
java_opts.append("-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={}".format(log_file)) | ||
jfr_cmd = "-XX:StartFlightRecording=defaultrecording=true" | ||
if recording_template: | ||
self.logger.info("jfr: Using recording template [%s].", recording_template) | ||
java_opts += ",settings={}".format(recording_template) | ||
jfr_cmd += ",settings={}".format(recording_template) | ||
else: | ||
self.logger.info("jfr: Using default recording template.") | ||
else: | ||
java_opts += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file) | ||
jfr_cmd += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file) | ||
if recording_template: | ||
self.logger.info("jfr: Using recording template [%s].", recording_template) | ||
java_opts += ",settings={}".format(recording_template) | ||
jfr_cmd += ",settings={}".format(recording_template) | ||
else: | ||
self.logger.info("jfr: Using default recording template.") | ||
java_opts.append(jfr_cmd) | ||
return java_opts | ||
|
||
|
||
|
@@ -228,12 +225,12 @@ def __init__(self, log_root): | |
super().__init__() | ||
self.log_root = log_root | ||
|
||
def instrument_env(self, car, candidate_id): | ||
def instrument_java_opts(self, car, candidate_id): | ||
io.ensure_dir(self.log_root) | ||
log_file = "%s/%s-%s.jit.log" % (self.log_root, car.safe_name, candidate_id) | ||
console.info("%s: Writing JIT compiler log to [%s]" % (self.human_name, log_file), logger=self.logger) | ||
return {"ES_JAVA_OPTS": "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation " | ||
"-XX:LogFile=%s -XX:+PrintAssembly" % log_file} | ||
return ["-XX:+UnlockDiagnosticVMOptions", "-XX:+TraceClassLoading", "-XX:+LogCompilation", | ||
"-XX:LogFile={}".format(log_file), "-XX:+PrintAssembly"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This over-indented, see https://lintlyci.github.io/Flake8Rules/rules/E127.html; would be great if we could align. |
||
|
||
|
||
class Gc(TelemetryDevice): | ||
|
@@ -247,20 +244,20 @@ def __init__(self, log_root, java_major_version): | |
self.log_root = log_root | ||
self.java_major_version = java_major_version | ||
|
||
def instrument_env(self, car, candidate_id): | ||
def instrument_java_opts(self, car, candidate_id): | ||
io.ensure_dir(self.log_root) | ||
log_file = "%s/%s-%s.gc.log" % (self.log_root, car.safe_name, candidate_id) | ||
console.info("%s: Writing GC log to [%s]" % (self.human_name, log_file), logger=self.logger) | ||
return self.java_opts(log_file) | ||
|
||
def java_opts(self, log_file): | ||
if self.java_major_version < 9: | ||
return {"ES_JAVA_OPTS": "-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps " | ||
"-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime " | ||
"-XX:+PrintTenuringDistribution" % log_file} | ||
return ["-Xloggc:{}".format(log_file), "-XX:+PrintGCDetails", "-XX:+PrintGCDateStamps", "-XX:+PrintGCTimeStamps", | ||
"-XX:+PrintGCApplicationStoppedTime", "-XX:+PrintGCApplicationConcurrentTime", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar PEP-8 style comment here: https://lintlyci.github.io/Flake8Rules/rules/E127.html |
||
"-XX:+PrintTenuringDistribution"] | ||
else: | ||
# see https://docs.oracle.com/javase/9/tools/java.htm#JSWOR-GUID-BE93ABDC-999C-4CB5-A88B-1994AAAC74D5 | ||
return {"ES_JAVA_OPTS": "-Xlog:gc*=info,safepoint=info,age*=trace:file=%s:utctime,uptimemillis,level,tags:filecount=0" % log_file} | ||
return ["-Xlog:gc*=info,safepoint=info,age*=trace:file={}:utctime,uptimemillis,level,tags:filecount=0".format(log_file)] | ||
|
||
|
||
class CcrStats(TelemetryDevice): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for extra space on this empty line