Skip to content

Commit

Permalink
[Fixing create-track] Support data streams in create-track (#1531)
Browse files Browse the repository at this point in the history
* First commit with indexes extracted

* Commit to allow data-stream usage for create-track command

Commit to allow data-stream usage even without indices flag

Updating error handling

Fixing lint errors

Fixing format errors

* Following PR review recommendations

* Adding more meaningfull names in variables

* Adding more meaningfull names in variables and fixing lint

* Updating conditions for arguments

* Adding test for create_track with datastream argument

* Removing tests as new story created to track this

* Removing tests as new story created to track this

* Use argparse to handle mutually exclusive options

* Spell data stream in two words

* Fix lint

* Simplify conditional

* Removing hadrcoded Allowed List of indices

* Fixing test missing variable that made unit tests to fail and also adding commentary to functions

* Adding functionality of hidden indices

* Adding check for _all filter and also created unit test for data-stream

* Add * and explain reasoning behind special case

Co-authored-by: Quentin Pradet <[email protected]>
  • Loading branch information
gizas and pquentin authored Jul 11, 2022
1 parent b00f4f0 commit 693d728
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 14 deletions.
22 changes: 17 additions & 5 deletions esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,17 @@ def add_track_source(subparser):
required=True,
help="Name of the generated track",
)
create_track_parser.add_argument(
indices_or_data_streams_group = create_track_parser.add_mutually_exclusive_group(required=True)
indices_or_data_streams_group.add_argument(
"--indices",
type=non_empty_list,
required=True,
help="Comma-separated list of indices to include in the track",
)
indices_or_data_streams_group.add_argument(
"--data-streams",
type=non_empty_list,
help="Comma-separated list of data streams to include in the track",
)
create_track_parser.add_argument(
"--target-hosts",
default="",
Expand Down Expand Up @@ -987,9 +992,16 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
generate(cfg)
elif sub_command == "create-track":
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
if args.data_streams is not None:
cfg.add(config.Scope.applicationOverride, "generator", "indices", "*")
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
elif args.indices is not None:
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "data_streams", args.data_streams)
cfg.add(config.Scope.applicationOverride, "generator", "output.path", args.output_path)
cfg.add(config.Scope.applicationOverride, "track", "track.name", args.track)
configure_connection_params(arg_parser, args, cfg)

tracker.create_track(cfg)
Expand Down
28 changes: 24 additions & 4 deletions esrally/tracker/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ def update_index_setting_parameters(settings):
settings[s] = param.format(orig=orig_value)


def is_valid(index_name):
def is_valid(index_name, index_pattern):
if len(index_name) == 0:
return False, "Index name is empty"
if index_name.startswith("."):
# When the indices are requested directly (with --data-streams or --indices) then we honor the
# request, even if it includes hidden indices. But when asking for all indices we skip hidden
# indices as they could be system indices and restoring them to another cluster would break it.
if index_pattern in ("_all", "*") and index_name.startswith("."):
return False, f"Index [{index_name}] is hidden"
return True, None

Expand All @@ -65,9 +68,9 @@ def extract_index_mapping_and_settings(client, index_pattern):
results = {}
logger = logging.getLogger(__name__)
# the response might contain multiple indices if a wildcard was provided
response = client.indices.get(index=index_pattern)
response = client.indices.get(index=index_pattern, params={"expand_wildcards": "all"})
for index, details in response.items():
valid, reason = is_valid(index)
valid, reason = is_valid(index, index_pattern)
if valid:
mappings = details["mappings"]
index_settings = filter_ephemeral_index_settings(details["settings"]["index"])
Expand Down Expand Up @@ -104,3 +107,20 @@ def extract(client, outdir, index_pattern):
}
)
return results


def extract_indices_from_data_stream(client, data_stream_pattern):
"""
Calls Elasticsearch client get_data_stream function to retrieve list of indices
:param client: Elasticsearch client
:param data_stream_pattern: name of data stream
:return: list of index names
"""
results = []
# the response might contain multiple indices if a wildcard was provided
params_defined = {"expand_wildcards": "all", "filter_path": "data_streams.name"}
results_data_streams = client.indices.get_data_stream(name=data_stream_pattern, params=params_defined)

for indices in results_data_streams["data_streams"]:
results.append(indices.get("name"))
return results
22 changes: 20 additions & 2 deletions esrally/tracker/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ def process_template(templates_path, template_filename, template_vars, output_pa
f.write(template.render(template_vars))


def extract_indices_from_data_streams(client, data_streams_to_extract):
indices = []
# first extract index metadata (which is cheap) and defer extracting data to reduce the potential for
# errors due to invalid index names late in the process.
for data_stream_name in data_streams_to_extract:
try:
indices += index.extract_indices_from_data_stream(client, data_stream_name)
except ElasticsearchException:
logging.getLogger(__name__).exception("Failed to extract indices from data stream [%s]", data_stream_name)

return indices


def extract_mappings_and_corpora(client, output_path, indices_to_extract):
indices = []
corpora = []
Expand Down Expand Up @@ -63,8 +76,7 @@ def create_track(cfg):
root_path = cfg.opts("generator", "output.path")
target_hosts = cfg.opts("client", "hosts")
client_options = cfg.opts("client", "options")

logger.info("Creating track [%s] matching indices [%s]", track_name, indices)
data_streams = cfg.opts("generator", "data_streams")

client = EsClientFactory(
hosts=target_hosts.all_hosts[opts.TargetHosts.DEFAULT], client_options=client_options.all_client_options[opts.TargetHosts.DEFAULT]
Expand All @@ -76,6 +88,12 @@ def create_track(cfg):
output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), track_name))
io.ensure_dir(output_path)

if data_streams is not None:
logger.info("Creating track [%s] matching data streams [%s]", track_name, data_streams)
extracted_indices = extract_indices_from_data_streams(client, data_streams)
indices = extracted_indices
logger.info("Creating track [%s] matching indices [%s]", track_name, indices)

indices, corpora = extract_mappings_and_corpora(client, output_path, indices)
if len(indices) == 0:
raise RuntimeError("Failed to extract any indices for track!")
Expand Down
35 changes: 32 additions & 3 deletions tests/tracker/index_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from esrally.tracker.index import (
extract_index_mapping_and_settings,
extract_indices_from_data_stream,
filter_ephemeral_index_settings,
update_index_setting_parameters,
)
Expand Down Expand Up @@ -116,7 +117,9 @@ def test_extract_index_create(client):
},
},
}
expected = {

res = extract_index_mapping_and_settings(client, "_all")
assert res == {
"osmgeopoints": {
"mappings": {
"dynamic": "strict",
Expand Down Expand Up @@ -145,5 +148,31 @@ def test_extract_index_create(client):
},
},
}
res = extract_index_mapping_and_settings(client, "_all")
assert res == expected


@mock.patch("elasticsearch.Elasticsearch")
def test_extract_indices_from_data_stream(client):
data_streams_filter = ["metrics-kubernetes-*"]
client.indices.get_data_stream.return_value = {
"data_streams": [
{
"name": "metrics-kubernetes.event-default",
"timestamp_field": {"name": "@timestamp"},
"indices": [
{"index_name": ".ds-metrics-kubernetes.event-default-2022.06.20-000001", "index_uuid": "0W8L56dKQoGXjkGQc8mfzg"}
],
"generation": 1,
"_meta": {"description": "default metrics template installed by x-pack", "managed": "true"},
"status": "GREEN",
"template": "metrics",
"ilm_policy": "metrics",
"hidden": "false",
"system": "false",
"allow_custom_routing": "false",
"replicated": "false",
}
]
}

res = extract_indices_from_data_stream(client, data_streams_filter)
assert res == ["metrics-kubernetes.event-default"]

0 comments on commit 693d728

Please sign in to comment.