Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Datastreams #1092

Merged
merged 9 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def register_default_runners():
register_runner(track.OperationType.Refresh.name, Retry(Refresh()), async_runner=True)
register_runner(track.OperationType.CreateIndex.name, Retry(CreateIndex()), async_runner=True)
register_runner(track.OperationType.DeleteIndex.name, Retry(DeleteIndex()), async_runner=True)
register_runner(track.OperationType.CreateDataStream.name, Retry(CreateDataStream()), async_runner=True)
register_runner(track.OperationType.DeleteDataStream.name, Retry(DeleteDataStream()), async_runner=True)
register_runner(track.OperationType.CreateIndexTemplate.name, Retry(CreateIndexTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteIndexTemplate.name, Retry(DeleteIndexTemplate()), async_runner=True)
register_runner(track.OperationType.ShrinkIndex.name, Retry(ShrinkIndex()), async_runner=True)
Expand Down Expand Up @@ -1031,6 +1033,22 @@ def __repr__(self, *args, **kwargs):
return "create-index"


class CreateDataStream(Runner):
"""
Execute the `create data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-data-stream.html>`_.
"""

async def __call__(self, es, params):
data_streams = mandatory(params, "data-streams", self)
request_params = params.get("request-params", {})
gingerwizard marked this conversation as resolved.
Show resolved Hide resolved
for data_stream in data_streams:
await es.indices.create_data_stream(data_stream, params=request_params)
return len(data_streams), "ops"

def __repr__(self, *args, **kwargs):
return "create-data-stream"


class DeleteIndex(Runner):
"""
Execute the `delete index API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html>`_.
Expand Down Expand Up @@ -1058,6 +1076,33 @@ def __repr__(self, *args, **kwargs):
return "delete-index"


class DeleteDataStream(Runner):
"""
Execute the `delete data stream API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-data-stream.html>`_.
"""

async def __call__(self, es, params):
ops = 0

data_streams = mandatory(params, "data-streams", self)
only_if_exists = params.get("only-if-exists", False)
gingerwizard marked this conversation as resolved.
Show resolved Hide resolved
request_params = params.get("request-params", {})
gingerwizard marked this conversation as resolved.
Show resolved Hide resolved

for data_stream in data_streams:
if not only_if_exists:
await es.indices.delete_data_stream(data_stream, ignore=[404], params=request_params)
ops += 1
elif only_if_exists and await es.indices.exists(index=data_stream):
self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream)
await es.indices.delete_data_stream(data_stream, params=request_params)
ops += 1

return ops, "ops"

def __repr__(self, *args, **kwargs):
return "delete-data-stream"


class CreateIndexTemplate(Runner):
"""
Execute the `PUT index template API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html>`_.
Expand Down
24 changes: 23 additions & 1 deletion esrally/resources/track-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,24 @@
]
}
},
"data-streams": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": {
"title": "Index",
gingerwizard marked this conversation as resolved.
Show resolved Hide resolved
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the data stream to create."
}
},
"required": [
"name"
]
}
},
"corpora": {
"type": "array",
"minItems": 1,
Expand Down Expand Up @@ -291,6 +309,10 @@
"type": "string",
"description": "The name of the associated index (if any)."
},
"target-data-stream": {
"type": "string",
"description": "The name of the associated data stream (if any)."
},
"target-type": {
"type": "string",
"description": "The name of the associated document type (if any)."
Expand Down Expand Up @@ -469,4 +491,4 @@
"$ref": "#/definitions/schedule"
}
}
}
}
39 changes: 32 additions & 7 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,13 +987,15 @@ def __call__(self, track_name, track_specification, mapping_dir):
meta_data = self._r(track_specification, "meta", mandatory=False)
indices = [self._create_index(idx, mapping_dir)
for idx in self._r(track_specification, "indices", mandatory=False, default_value=[])]
data_streams = [self._create_data_stream(idx)
for idx in self._r(track_specification, "data-streams", mandatory=False, default_value=[])]
templates = [self._create_index_template(tpl, mapping_dir)
for tpl in self._r(track_specification, "templates", mandatory=False, default_value=[])]
corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), indices)
corpora = self._create_corpora(self._r(track_specification, "corpora", mandatory=False, default_value=[]), indices, data_streams)
challenges = self._create_challenges(track_specification)
# at this point, *all* track params must have been referenced in the templates
return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges, indices=indices,
templates=templates, corpora=corpora)
return track.Track(name=self.name, meta_data=meta_data, description=description, challenges=challenges,
indices=indices, data_streams=data_streams, templates=templates, corpora=corpora)

def _error(self, msg):
raise TrackSyntaxError("Track '%s' is invalid. %s" % (self.name, msg))
Expand Down Expand Up @@ -1031,6 +1033,9 @@ def _create_index(self, index_spec, mapping_dir):

return track.Index(name=index_name, body=body, types=self._r(index_spec, "types", mandatory=False, default_value=[]))

def _create_data_stream(self, data_stream_spec):
return track.DataStream(name=self._r(data_stream_spec, "name"))

def _create_index_template(self, tpl_spec, mapping_dir):
name = self._r(tpl_spec, "name")
template_file = self._r(tpl_spec, "template")
Expand All @@ -1056,7 +1061,7 @@ def _load_template(self, contents, description):
self.logger.exception("Could not load file template for %s.", description)
raise TrackSyntaxError("Could not load file template for '%s'" % description, str(e))

def _create_corpora(self, corpora_specs, indices):
def _create_corpora(self, corpora_specs, indices, data_streams):
document_corpora = []
known_corpora_names = set()
for corpus_spec in corpora_specs:
Expand All @@ -1077,6 +1082,11 @@ def _create_corpora(self, corpora_specs, indices):
else:
corpus_target_idx = self._r(corpus_spec, "target-index", mandatory=False)

if len(data_streams) == 1:
corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False, default_value=data_streams[0].name)
else:
corpus_target_ds = self._r(corpus_spec, "target-data-stream", mandatory=False)

if len(indices) == 1 and len(indices[0].types) == 1:
corpus_target_type = self._r(corpus_spec, "target-type", mandatory=False, default_value=indices[0].types[0])
else:
Expand Down Expand Up @@ -1104,12 +1114,26 @@ def _create_corpora(self, corpora_specs, indices):
target_idx = None
target_type = None
else:
target_ds = self._r(doc_spec, "target-data-stream", mandatory=False,
error_ctx=docs)

# we need an index if no meta-data are present.
target_idx = self._r(doc_spec, "target-index", mandatory=corpus_target_idx is None,
default_value=corpus_target_idx, error_ctx=docs)
target_idx = self._r(doc_spec, "target-index",
mandatory=corpus_target_idx is None and corpus_target_ds is None
and target_ds is None,
error_ctx=docs)
target_type = self._r(doc_spec, "target-type", mandatory=False,
default_value=corpus_target_type, error_ctx=docs)

# here we choose to use either an index or data streams. If either are explicitly specified
# (index takes precedence) this is preferred over any defaults. Index then takes precedence.
gingerwizard marked this conversation as resolved.
Show resolved Hide resolved
if target_idx:
target_ds = None
elif target_ds is None and corpus_target_idx:
target_idx = corpus_target_idx
elif target_ds is None:
target_ds = corpus_target_ds

docs = track.Documents(source_format=source_format,
document_file=document_file,
document_archive=document_archive,
Expand All @@ -1118,7 +1142,8 @@ def _create_corpora(self, corpora_specs, indices):
number_of_documents=num_docs,
compressed_size_in_bytes=compressed_bytes,
uncompressed_size_in_bytes=uncompressed_bytes,
target_index=target_idx, target_type=target_type)
target_index=target_idx, target_type=target_type,
target_data_stream=target_ds)
corpus.documents.append(docs)
else:
self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name))
Expand Down
Loading