Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add use-create option to fail dups #1458

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,8 @@ def __init__(self, track, params, **kwargs):
self.on_conflict = None
self.recency = None

self.use_create = params.get("use-create", False)

self.corpora = self.used_corpora(track, params)

if len(self.corpora) == 0:
Expand Down Expand Up @@ -644,6 +646,7 @@ def __init__(self, track, params, **kwargs):
self.on_conflict,
self.recency,
self.pipeline,
self.use_create,
self._params,
)

Expand Down Expand Up @@ -705,6 +708,7 @@ def __init__(
on_conflict,
recency,
pipeline=None,
use_create=False,
original_params=None,
):
"""
Expand All @@ -719,6 +723,8 @@ def __init__(
:param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones.
May be None.
:param pipeline: The name of the ingest pipeline to run.
:param use_create: Should generated bulk operations "create" so that duplicate `_id`s fail (True) or use "index" so that duplicate
`_id`s overwrite (False).
:param original_params: The original dict passed to the parent parameter source.
"""
self.corpora = corpora
Expand All @@ -732,6 +738,7 @@ def __init__(
self.on_conflict = on_conflict
self.recency = recency
self.pipeline = pipeline
self.use_create = use_create
self.original_params = original_params
# this is only intended for unit-testing
self.create_reader = original_params.pop("__create_reader", create_default_reader)
Expand Down Expand Up @@ -778,6 +785,7 @@ def _init_internal_params(self):
self.recency,
self.pipeline,
self.original_params,
self.use_create,
self.create_reader,
)

Expand Down Expand Up @@ -912,11 +920,10 @@ def chain(*iterables):


def create_default_reader(
docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency
docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency, use_create
):
source = Slice(io.MmapSource, offset, num_lines)
target = None
use_create = False
if docs.target_index:
target = docs.target_index
elif docs.target_data_stream:
Expand Down Expand Up @@ -952,6 +959,7 @@ def create_readers(
conflict_probability: float,
on_conflict: str,
recency: str,
use_create: bool,
create_reader: Callable[..., IndexDataReader],
) -> list[IndexDataReader]:
"""
Expand Down Expand Up @@ -982,7 +990,7 @@ def create_readers(
)
if num_docs > 0:
reader: IndexDataReader = create_reader(
docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency
docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency, use_create
)
reader_queue.append(reader)
total_readers += 1
Expand Down Expand Up @@ -1065,6 +1073,7 @@ def bulk_data_based(
recency,
pipeline,
original_params,
use_create,
create_reader=create_default_reader,
):
"""
Expand Down Expand Up @@ -1098,6 +1107,7 @@ def bulk_data_based(
conflict_probability,
on_conflict,
recency,
use_create,
create_reader,
)
return bulk_generator(chain(*readers), pipeline, original_params)
Expand Down
3 changes: 3 additions & 0 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ def test_generate_two_bulks(self):
on_conflict=None,
recency=None,
pipeline=None,
use_create=False,
original_params={"my-custom-parameter": "foo", "my-custom-parameter-2": True},
create_reader=self.create_test_reader([["1", "2", "3", "4", "5"], ["6", "7", "8"]]),
)
Expand Down Expand Up @@ -1431,6 +1432,7 @@ def test_generate_bulks_from_multiple_corpora(self):
on_conflict=None,
recency=None,
pipeline=None,
use_create=False,
original_params={"my-custom-parameter": "foo", "my-custom-parameter-2": True},
create_reader=self.create_test_reader([["1", "2", "3", "4", "5"]]),
)
Expand Down Expand Up @@ -1553,6 +1555,7 @@ def test_internal_params_take_precedence(self):
on_conflict=None,
recency=None,
pipeline=None,
use_create=False,
original_params={"body": "foo", "custom-param": "bar"},
create_reader=self.create_test_reader([["1", "2", "3"]]),
)
Expand Down