Skip to content

Commit

Permalink
Allow conflict-probability value of 0
Browse files Browse the repository at this point in the history
For the use case of externally generated ids (no updates), allow
setting conflict-probability to the value 0.

Relates elastic#510
  • Loading branch information
dliappis committed May 25, 2018
1 parent d6daddd commit ccb2bfc
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
4 changes: 2 additions & 2 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ With the operation type ``bulk`` you can execute `bulk requests <http://www.elas
* ``indices`` (optional): A list of index names that defines which indices should be used by this bulk-index operation. Rally will then only select the documents files that have a matching ``target-index`` specified.
* ``batch-size`` (optional): Defines how many documents Rally will read at once. This is an expert setting and only meant to avoid accidental bottlenecks for very small bulk sizes (e.g. if you want to benchmark with a bulk-size of 1, you should set ``batch-size`` higher).
* ``pipeline`` (optional): Defines the name of an (existing) ingest pipeline that should be used (only supported from Elasticsearch 5.0).
* ``conflicts`` (optional): Type of index conflicts to simulate. If not specified, no conflicts will be simulated. Valid values are: 'sequential' (A document id is replaced with a document id with a sequentially increasing id), 'random' (A document id is replaced with a document id with a random other id).
* ``conflict-probability`` (optional, defaults to 25 percent): A number between (0, 100] that defines how many of the documents will get replaced.
* ``conflicts`` (optional): Type of index conflicts to simulate. If not specified, no conflicts will be simulated (also read below on how to use external index ids with no conflicts). Valid values are: 'sequential' (A document id is replaced with a document id with a sequentially increasing id), 'random' (A document id is replaced with a document id with a random other id).
* ``conflict-probability`` (optional, defaults to 25 percent): A number between [0, 100] that defines how many of the documents will get replaced. Combining ``conflicts=sequential`` and ``conflict-probability=0`` makes Rally generate index ids by itself, instead of relying on Elasticsearch's `automatic id generation <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_automatic_id_generation>`_.
* ``on-conflict`` (optional, defaults to ``index``): Determines whether Rally should use the action ``index`` or ``update`` on id conflicts.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results.

Expand Down
6 changes: 4 additions & 2 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probabi
self.conflicting_ids = conflicting_ids
self.on_conflict = on_conflict
# random() produces numbers between 0 and 1 and the user denotes the probability in percentage between 0 and 100.
self.conflict_probability = conflict_probability / 100.0 if conflict_probability else None
self.conflict_probability = conflict_probability / 100.0 if conflict_probability is not None else 0

self.rand = rand
self.randint = randint
Expand All @@ -731,10 +731,12 @@ def __iter__(self):

def __next__(self):
if self.conflicting_ids is not None:
if self.id_up_to > 0 and self.rand() <= self.conflict_probability:
if self.conflict_probability and self.id_up_to > 0 and self.rand() <= self.conflict_probability:
doc_id = self.conflicting_ids[self.randint(0, self.id_up_to - 1)]
action = self.on_conflict
else:
if self.id_up_to >= len(self.conflicting_ids):
raise StopIteration()
doc_id = self.conflicting_ids[self.id_up_to]
self.id_up_to += 1
action = "index"
Expand Down
51 changes: 51 additions & 0 deletions tests/track/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ def conflict(action, id):
# and we're back to random
self.assertEqual(conflict(conflict_action, "100"), next(generator))

def test_generate_action_meta_data_with_id_and_zero_conflict_probability(self):
def idx(id):
return "index", '{"index": {"_index": "test_index", "_type": "test_type", "_id": "%s"}}' % id

test_ids = [100, 200, 300, 400]

generator = params.GenerateActionMetaData("test_index", "test_type",
conflicting_ids=test_ids,
conflict_probability=0)

self.assertListEqual([idx(id) for id in test_ids], list(generator))

def test_source_file_action_meta_data(self):
source = params.Slice(io.StringAsFileSource, 0, 5)
generator = params.SourceActionMetaData(source)
Expand Down Expand Up @@ -351,6 +363,45 @@ def test_read_bulk_with_id_conflicts(self):

], bulks)

def test_read_bulk_with_external_id_and_zero_conflict_probability(self):
data = [
'{"key": "value1"}',
'{"key": "value2"}',
'{"key": "value3"}',
'{"key": "value4"}'
]
bulk_size = 2

source = params.Slice(io.StringAsFileSource, 0, len(data))
am_handler = params.GenerateActionMetaData("test_index", "test_type",
conflicting_ids=[100, 200, 300, 400],
conflict_probability=0)

reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler,
index_name="test_index", type_name="test_type")

# consume all bulks
bulks = []
with reader:
for index, type, batch in reader:
for bulk_size, bulk in batch:
bulks.append(bulk)

self.assertEqual([
[
'{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}',
'{"key": "value1"}',
'{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}',
'{"key": "value2"}'
],
[
'{"index": {"_index": "test_index", "_type": "test_type", "_id": "300"}}',
'{"key": "value3"}',
'{"index": {"_index": "test_index", "_type": "test_type", "_id": "400"}}',
'{"key": "value4"}'
]
], bulks)

def assert_bulks_sized(self, reader, expected_bulk_sizes, expected_line_sizes):
with reader:
bulk_index = 0
Expand Down

0 comments on commit ccb2bfc

Please sign in to comment.