Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support data streams in create-track #1531

Merged
merged 18 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,17 @@ def add_track_source(subparser):
required=True,
help="Name of the generated track",
)
create_track_parser.add_argument(
indices_or_data_streams_group = create_track_parser.add_mutually_exclusive_group(required=True)
indices_or_data_streams_group.add_argument(
"--indices",
type=non_empty_list,
required=True,
help="Comma-separated list of indices to include in the track",
)
indices_or_data_streams_group.add_argument(
"--data-streams",
type=non_empty_list,
help="Comma-separated list of data streams to include in the track",
)
create_track_parser.add_argument(
"--target-hosts",
default="",
Expand Down Expand Up @@ -987,9 +992,16 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
generate(cfg)
elif sub_command == "create-track":
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
if args.data_streams is not None:
cfg.add(config.Scope.applicationOverride, "generator", "indices", "*")
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
elif args.indices is not None:
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
configure_connection_params(arg_parser, args, cfg)

tracker.create_track(cfg)
Expand Down
28 changes: 24 additions & 4 deletions esrally/tracker/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ def update_index_setting_parameters(settings):
settings[s] = param.format(orig=orig_value)


def is_valid(index_name):
def is_valid(index_name, index_pattern):
if len(index_name) == 0:
return False, "Index name is empty"
if index_name.startswith("."):
# When the indices are requested directly (with --data-streams or --indices) then we honor the
# request, even if it includes hidden indices. But when asking for all indices we skip hidden
# indices as they could be system indices and restoring them to another cluster would break it.
if index_pattern in ("_all", "*") and index_name.startswith("."):
return False, f"Index [{index_name}] is hidden"
return True, None

Expand All @@ -65,9 +68,9 @@ def extract_index_mapping_and_settings(client, index_pattern):
results = {}
logger = logging.getLogger(__name__)
# the response might contain multiple indices if a wildcard was provided
response = client.indices.get(index=index_pattern)
response = client.indices.get(index=index_pattern, params={"expand_wildcards": "all"})
for index, details in response.items():
valid, reason = is_valid(index)
valid, reason = is_valid(index, index_pattern)
if valid:
mappings = details["mappings"]
index_settings = filter_ephemeral_index_settings(details["settings"]["index"])
Expand Down Expand Up @@ -104,3 +107,20 @@ def extract(client, outdir, index_pattern):
}
)
return results


def extract_indices_from_data_stream(client, data_stream_pattern):
"""
Calls Elasticsearch client get_data_stream function to retrieve list of indices
:param client: Elasticsearch client
:param data_stream_pattern: name of data stream
:return: list of index names
"""
results = []
# the response might contain multiple indices if a wildcard was provided
params_defined = {"expand_wildcards": "all", "filter_path": "data_streams.name"}
results_data_streams = client.indices.get_data_stream(name=data_stream_pattern, params=params_defined)

for indices in results_data_streams["data_streams"]:
results.append(indices.get("name"))
return results
22 changes: 20 additions & 2 deletions esrally/tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ def process_template(templates_path, template_filename, template_vars, output_pa
f.write(template.render(template_vars))


def extract_indices_from_data_streams(client, data_streams_to_extract):
indices = []
# first extract index metadata (which is cheap) and defer extracting data to reduce the potential for
# errors due to invalid index names late in the process.
for data_stream_name in data_streams_to_extract:
try:
indices += index.extract_indices_from_data_stream(client, data_stream_name)
except ElasticsearchException:
logging.getLogger(__name__).exception("Failed to extract indices from data stream [%s]", data_stream_name)

return indices


def extract_mappings_and_corpora(client, output_path, indices_to_extract):
indices = []
corpora = []
Expand Down Expand Up @@ -63,8 +76,7 @@ def create_track(cfg):
root_path = cfg.opts("generator", "output.path")
target_hosts = cfg.opts("client", "hosts")
client_options = cfg.opts("client", "options")

logger.info("Creating track [%s] matching indices [%s]", track_name, indices)
data_streams = cfg.opts("generator", "data_streams")

client = EsClientFactory(
hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]
Expand All @@ -76,6 +88,12 @@ def create_track(cfg):
output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), track_name))
io.ensure_dir(output_path)

if data_streams is not None:
logger.info("Creating track [%s] matching data streams [%s]", track_name, data_streams)
extracted_indices = extract_indices_from_data_streams(client, data_streams)
indices = extracted_indices
logger.info("Creating track [%s] matching indices [%s]", track_name, indices)

indices, corpora = extract_mappings_and_corpora(client, output_path, indices)
if len(indices) == 0:
raise RuntimeError("Failed to extract any indices for track!")
Expand Down
35 changes: 32 additions & 3 deletions tests/tracker/index_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from esrally.tracker.index import (
extract_index_mapping_and_settings,
extract_indices_from_data_stream,
filter_ephemeral_index_settings,
update_index_setting_parameters,
)
Expand Down Expand Up @@ -116,7 +117,9 @@ def test_extract_index_create(client):
},
},
}
expected = {

res = extract_index_mapping_and_settings(client, "_all")
assert res == {
"osmgeopoints": {
"mappings": {
"dynamic": "strict",
Expand Down Expand Up @@ -145,5 +148,31 @@ def test_extract_index_create(client):
},
},
}
res = extract_index_mapping_and_settings(client, "_all")
assert res == expected


@mock.patch("elasticsearch.Elasticsearch")
def test_extract_indices_from_data_stream(client):
data_streams_filter = ["metrics-kubernetes-*"]
client.indices.get_data_stream.return_value = {
"data_streams": [
{
"name": "metrics-kubernetes.event-default",
"timestamp_field": {"name": "@timestamp"},
"indices": [
{"index_name": ".ds-metrics-kubernetes.event-default-2022.06.20-000001", "index_uuid": "0W8L56dKQoGXjkGQc8mfzg"}
],
"generation": 1,
"_meta": {"description": "default metrics template installed by x-pack", "managed": "true"},
"status": "GREEN",
"template": "metrics",
"ilm_policy": "metrics",
"hidden": "false",
"system": "false",
"allow_custom_routing": "false",
"replicated": "false",
}
]
}

res = extract_indices_from_data_stream(client, data_streams_filter)
assert res == ["metrics-kubernetes.event-default"]