diff --git a/benchmarks/track/bulk_params_test.py b/benchmarks/track/bulk_params_test.py new file mode 100644 index 000000000..e3a5997bc --- /dev/null +++ b/benchmarks/track/bulk_params_test.py @@ -0,0 +1,130 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys + +import pytest + +from esrally.track import params + + +class StaticSource: + def __init__(self, contents, mode, encoding="utf-8"): + self.contents = '{"geonameid": 2986043, "name": "Pic de Font Blanca", "asciiname": "Pic de Font Blanca"}' + self.current_index = 0 + self.opened = False + + def open(self): + self.opened = True + return self + + def seek(self, offset): + pass + + def read(self): + return "\n".join(self.contents) + + def readline(self): + return self.contents + + def readlines(self, num_lines): + return [self.contents] * num_lines + + def close(self): + self._assert_opened() + self.contents = None + self.opened = False + + def _assert_opened(self): + assert self.opened + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + def __str__(self, *args, **kwargs): + return "StaticSource" + + +def create_reader(bulk_size): + metadata = params.GenerateActionMetaData(index_name="test-idx", type_name=None) + + source = params.Slice(StaticSource, 0, sys.maxsize) + reader = params.MetadataIndexDataReader(data_file="bogus", + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + action_metadata=metadata, + index_name="test-idx", + type_name=None) + return reader + + +@pytest.mark.benchmark( + group="bulk-params", + warmup=True, + warmup_iterations=10000, + disable_gc=True, +) +def test_index_data_reader_100(benchmark): + reader = create_reader(bulk_size=100) + reader.__enter__() + benchmark(reader.__next__) + reader.__exit__(None, None, None) + + +@pytest.mark.benchmark( + group="bulk-params", + warmup=True, + warmup_iterations=10000, + disable_gc=True, +) +def test_index_data_reader_1000(benchmark): + reader = create_reader(bulk_size=1000) + reader.__enter__() + benchmark(reader.__next__) + reader.__exit__(None, None, None) + + +@pytest.mark.benchmark( + group="bulk-params", + warmup=True, + warmup_iterations=10000, + disable_gc=True, +) +def test_index_data_reader_10000(benchmark): + reader = create_reader(bulk_size=10000) + reader.__enter__() + benchmark(reader.__next__) + reader.__exit__(None, None, None) + + +@pytest.mark.benchmark( + group="bulk-params", + warmup=True, + warmup_iterations=10000, + disable_gc=True, +) +def test_index_data_reader_100000(benchmark): + reader = create_reader(bulk_size=100000) + reader.__enter__() + benchmark(reader.__next__) + reader.__exit__(None, None, None) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 6fdfe150c..363812784 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -470,7 +470,14 @@ def detailed_stats(self, params, bulk_size, response): total_document_size_bytes = 0 with_action_metadata = mandatory(params, "action-metadata-present", self) - for line_number, data in enumerate(params["body"]): + if isinstance(params["body"], str): + bulk_lines = params["body"].split("\n") + elif isinstance(params["body"], list): + bulk_lines = params["body"] + else: + raise exceptions.DataError("bulk body is neither string nor list") + + for line_number, data in enumerate(bulk_lines): line_size = len(data.encode('utf-8')) if with_action_metadata: if line_number % 2 == 1: diff --git a/esrally/track/params.py b/esrally/track/params.py index c7c079a14..4d9f7a118 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -571,7 +571,7 @@ def params(self): # self.internal_params always reads all files. This is necessary to ensure we terminate early in case # the user has specified ingest percentage. if self.current_bulk == self.total_bulks: - raise StopIteration + raise StopIteration() self.current_bulk += 1 return next(self.internal_params) @@ -647,13 +647,12 @@ def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_si source = Slice(io.FileSource, offset, num_lines) if docs.includes_action_and_meta_data: - am_handler = SourceActionMetaData(source) + return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, docs.target_index, docs.target_type) else: am_handler = GenerateActionMetaData(docs.target_index, docs.target_type, build_conflicting_ids(id_conflicts, num_docs, offset), conflict_probability, on_conflict, recency) - - return IndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type) + return MetadataIndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type) def create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency, @@ -758,15 +757,15 @@ class GenerateActionMetaData: def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probability=None, on_conflict=None, recency=None, rand=random.random, randint=random.randint, randexp=random.expovariate): if type_name: - self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \ + self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \ (index_name, type_name, "%s") - self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \ + self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \ (index_name, type_name, "%s") - self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}' % (index_name, type_name) + self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}\n' % (index_name, type_name) else: - self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s") - self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s") - self.meta_data_index_no_id = '{"index": {"_index": "%s"}}' % index_name + self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s") + self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s") + self.meta_data_index_no_id = '{"index": {"_index": "%s"}}\n' % index_name self.conflicting_ids = conflicting_ids self.on_conflict = on_conflict @@ -779,6 +778,13 @@ def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probabi self.randexp = randexp self.id_up_to = 0 + @property + def is_constant(self): + """ + :return: True iff the iterator will always return the same value. + """ + return self.conflicting_ids is None + def __iter__(self): return self @@ -818,17 +824,6 @@ def __next__(self): return "index", self.meta_data_index_no_id -class SourceActionMetaData: - def __init__(self, source): - self.source = source - - def __iter__(self): - return self - - def __next__(self): - return "source", next(self.source) - - class Slice: def __init__(self, source_class, offset, number_of_lines): self.source_class = source_class @@ -836,16 +831,18 @@ def __init__(self, source_class, offset, number_of_lines): self.offset = offset self.number_of_lines = number_of_lines self.current_line = 0 + self.bulk_size = None + self.logger = logging.getLogger(__name__) - def open(self, file_name, mode): - logger = logging.getLogger(__name__) + def open(self, file_name, mode, bulk_size): + self.bulk_size = bulk_size self.source = self.source_class(file_name, mode).open() - # skip offset number of lines - logger.info("Skipping %d lines in [%s].", self.offset, file_name) + self.logger.info("Will read [%d] lines from [%s] starting from line [%d] with bulk size [%d].", + self.number_of_lines, file_name, self.offset, self.bulk_size) start = time.perf_counter() io.skip_lines(file_name, self.source, self.offset) end = time.perf_counter() - logger.info("Skipping %d lines took %f s.", self.offset, end - start) + self.logger.debug("Skipping [%d] lines took [%f] s.", self.offset, end - start) return self def close(self): @@ -859,11 +856,12 @@ def __next__(self): if self.current_line >= self.number_of_lines: raise StopIteration() else: - self.current_line += 1 - line = self.source.readline() - if len(line) == 0: + # ensure we don't read past the allowed number of lines. + lines = self.source.readlines(min(self.bulk_size, self.number_of_lines - self.current_line)) + self.current_line += len(lines) + if len(lines) == 0: raise StopIteration() - return line.strip() + return lines def __str__(self): return "%s[%d;%d]" % (self.source, self.offset, self.offset + self.number_of_lines) @@ -873,21 +871,20 @@ class IndexDataReader: """ Reads a file in bulks into an array and also adds a meta-data line before each document if necessary. - This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N is any natural - number >= 1. This makes file reading more efficient for small bulk sizes. + This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N + is any natural number >= 1. This makes file reading more efficient for small bulk sizes. """ - def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name): + def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name): self.data_file = data_file self.batch_size = batch_size self.bulk_size = bulk_size self.file_source = file_source - self.action_metadata = action_metadata self.index_name = index_name self.type_name = type_name def __enter__(self): - self.file_source.open(self.data_file, 'rt') + self.file_source.open(self.data_file, "rt", self.bulk_size) return self def __iter__(self): @@ -901,38 +898,86 @@ def __next__(self): try: docs_in_batch = 0 while docs_in_batch < self.batch_size: - docs_in_bulk, bulk = self.read_bulk() + try: + docs_in_bulk, bulk = self.read_bulk() + except StopIteration: + break if docs_in_bulk == 0: break docs_in_batch += docs_in_bulk - batch.append((docs_in_bulk, bulk)) + batch.append((docs_in_bulk, "".join(bulk))) if docs_in_batch == 0: raise StopIteration() return self.index_name, self.type_name, batch except IOError: logging.getLogger(__name__).exception("Could not read [%s]", self.data_file) - def read_bulk(self): - docs_in_bulk = 0 + def __exit__(self, exc_type, exc_val, exc_tb): + self.file_source.close() + return False + + +class MetadataIndexDataReader(IndexDataReader): + def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name): + super().__init__(data_file, batch_size, bulk_size, file_source, index_name, type_name) + self.action_metadata = action_metadata + self.action_metadata_line = None + + def __enter__(self): + super().__enter__() + if self.action_metadata.is_constant: + _, self.action_metadata_line = next(self.action_metadata) + self.read_bulk = self._read_bulk_fast + else: + self.read_bulk = self._read_bulk_regular + return self + + def _read_bulk_fast(self): + """ + Special-case implementation for bulk data files where the action and meta-data line is always identical. + """ current_bulk = [] - for action_metadata_item, document in zip(self.action_metadata, self.file_source): + # hoist + action_metadata_line = self.action_metadata_line + docs = next(self.file_source) + + for doc in docs: + current_bulk.append(action_metadata_line) + current_bulk.append(doc) + return len(docs), current_bulk + + def _read_bulk_regular(self): + """ + General case implementation for bulk files. This implementation can cover all cases but is slower when the + action and meta-data line is always identical. + """ + current_bulk = [] + docs = next(self.file_source) + for doc in docs: + action_metadata_item = next(self.action_metadata) if action_metadata_item: action_type, action_metadata_line = action_metadata_item current_bulk.append(action_metadata_line) if action_type == "update": - current_bulk.append("{\"doc\":%s}" % document) + # remove the trailing "\n" as the doc needs to fit on one line + doc = doc.strip() + current_bulk.append("{\"doc\":%s}\n" % doc) else: - current_bulk.append(document) + current_bulk.append(doc) else: - current_bulk.append(document) - docs_in_bulk += 1 - if docs_in_bulk == self.bulk_size: - break - return docs_in_bulk, current_bulk + current_bulk.append(doc) + return len(docs), current_bulk - def __exit__(self, exc_type, exc_val, exc_tb): - self.file_source.close() - return False + +class SourceOnlyIndexDataReader(IndexDataReader): + def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name): + # keep batch size as it only considers documents read, not lines read but increase the bulk size as + # documents are only on every other line. + super().__init__(data_file, batch_size, bulk_size * 2, file_source, index_name, type_name) + + def read_bulk(self): + bulk_items = next(self.file_source) + return len(bulk_items) // 2, bulk_items register_param_source_for_operation(track.OperationType.Bulk, BulkIndexParamSource) diff --git a/esrally/utils/io.py b/esrally/utils/io.py index 79407f376..ea134126d 100644 --- a/esrally/utils/io.py +++ b/esrally/utils/io.py @@ -50,6 +50,16 @@ def read(self): def readline(self): return self.f.readline() + def readlines(self, num_lines): + lines = [] + f = self.f + for _ in range(num_lines): + line = f.readline() + if len(line) == 0: + break + lines.append(line) + return lines + def close(self): self.f.close() self.f = None @@ -115,6 +125,15 @@ def readline(self): self.current_index += 1 return line + def readlines(self, num_lines): + lines = [] + for _ in range(num_lines): + line = self.readline() + if len(line) == 0: + break + lines.append(line) + return lines + def close(self): self._assert_opened() self.contents = None diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index f19775445..9e603f925 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -138,14 +138,12 @@ def test_bulk_index_missing_params(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - "action_meta_data", - "index_line", - "action_meta_data", - "index_line", - "action_meta_data", - "index_line" - ] + "body": "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n" } with self.assertRaises(exceptions.DataError) as ctx: @@ -161,14 +159,12 @@ def test_bulk_index_success_with_metadata(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - "action_meta_data", - "index_line", - "action_meta_data", - "index_line", - "action_meta_data", - "index_line" - ], + "body": "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n", "action-metadata-present": True, "bulk-size": 3 } @@ -194,11 +190,9 @@ def test_bulk_index_success_without_metadata_with_doc_type(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - "index_line", - "index_line", - "index_line" - ], + "body": "index_line\n" + + "index_line\n" + + "index_line\n", "action-metadata-present": False, "bulk-size": 3, "index": "test-index", @@ -226,11 +220,9 @@ def test_bulk_index_success_without_metadata_and_without_doc_type(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - "index_line", - "index_line", - "index_line" - ], + "body": "index_line\n" + + "index_line\n" + + "index_line\n", "action-metadata-present": False, "bulk-size": 3, "index": "test-index" @@ -290,14 +282,12 @@ def test_bulk_index_error(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - "action_meta_data", - "index_line", - "action_meta_data", - "index_line", - "action_meta_data", - "index_line" - ], + "body": "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n", "action-metadata-present": True, "bulk-size": 3, "index": "test" @@ -354,14 +344,12 @@ def test_bulk_index_error_no_shards(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - "action_meta_data", - "index_line", - "action_meta_data", - "index_line", - "action_meta_data", - "index_line", - ], + "body": "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "index_line\n", "action-metadata-present": True, "detailed-results": False, "bulk-size": 3, @@ -459,16 +447,14 @@ def test_mixed_bulk_with_simple_stats(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - "action_meta_data", - "index_line", - "action_meta_data", - "update_line", - "action_meta_data", - "index_line", - "action_meta_data", - "update_line" - ], + "body": "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "update_line\n" + + "action_meta_data\n" + + "index_line\n" + + "action_meta_data\n" + + "update_line\n", "action-metadata-present": True, "detailed-results": False, "bulk-size": 4, @@ -493,9 +479,8 @@ def test_mixed_bulk_with_simple_stats(self, es): result = bulk(es, bulk_params) self.assertNotIn("ingest_took", result) - @mock.patch("elasticsearch.Elasticsearch") - def test_mixed_bulk_with_detailed_stats(self, es): + def test_mixed_bulk_with_detailed_stats_body_as_string(self, es): es.bulk.return_value = { "took": 30, "ingest_took": 20, @@ -606,20 +591,18 @@ def test_mixed_bulk_with_detailed_stats(self, es): bulk = runner.BulkIndex() bulk_params = { - "body": [ - '{ "index" : { "_index" : "test", "_type" : "type1" } }', - '{"location" : [-0.1485188, 51.5250666]}', - '{ "update" : { "_index" : "test", "_type" : "type1", "_id: "2" } }', - '{"location" : [-0.1479949, 51.5252071]}', - '{ "index" : { "_index" : "test", "_type" : "type1" } }', - '{"location" : [-0.1458559, 51.5289059]}', - '{ "index" : { "_index" : "test", "_type" : "type1" } }', - '{"location" : [-0.1498551, 51.5282564]}', - '{ "index" : { "_index" : "test", "_type" : "type1" } }', - '{"location" : [-0.1487043, 51.5254843]}', - '{ "update" : { "_index" : "test", "_type" : "type1", "_id: "3" } }', - '{"location" : [-0.1533367, 51.5261779]}' - ], + "body": '{ "index" : { "_index" : "test", "_type" : "type1" } }\n' + + '{"location" : [-0.1485188, 51.5250666]}\n' + + '{ "update" : { "_index" : "test", "_type" : "type1", "_id: "2" } }\n' + + '{"location" : [-0.1479949, 51.5252071]}\n' + + '{ "index" : { "_index" : "test", "_type" : "type1" } }\n' + + '{"location" : [-0.1458559, 51.5289059]}\n' + + '{ "index" : { "_index" : "test", "_type" : "type1" } }\n' + + '{"location" : [-0.1498551, 51.5282564]}\n' + + '{ "index" : { "_index" : "test", "_type" : "type1" } }\n' + + '{"location" : [-0.1487043, 51.5254843]}\n' + + '{ "update" : { "_index" : "test", "_type" : "type1", "_id: "3" } }\n' + + '{"location" : [-0.1533367, 51.5261779]}\n', "action-metadata-present": True, "bulk-size": 6, "detailed-results": True, @@ -687,6 +670,124 @@ def test_mixed_bulk_with_detailed_stats(self, es): result = bulk(es, bulk_params) self.assertNotIn("ingest_took", result) + @mock.patch("elasticsearch.Elasticsearch") + def test_simple_bulk_with_detailed_stats_body_as_list(self, es): + es.bulk.return_value = { + "took": 30, + "ingest_took": 20, + "errors": False, + "items": [ + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "1", + "_version": 1, + "result": "created", + "_shards": { + "total": 2, + "successful": 1, + "failed": 0 + }, + "created": True, + "status": 201, + "_seq_no": 0 + } + } + ] + } + bulk = runner.BulkIndex() + + bulk_params = { + "body": '{ "index" : { "_index" : "test", "_type" : "type1" } }\n' + + '{"location" : [-0.1485188, 51.5250666]}\n', + "action-metadata-present": True, + "bulk-size": 1, + "detailed-results": True, + "index": "test" + } + + result = bulk(es, bulk_params) + + self.assertEqual("test", result["index"]) + self.assertEqual(30, result["took"]) + self.assertEqual(20, result["ingest_took"]) + self.assertEqual(1, result["weight"]) + self.assertEqual(1, result["bulk-size"]) + self.assertEqual("docs", result["unit"]) + self.assertEqual(True, result["success"]) + self.assertEqual(0, result["error-count"]) + self.assertEqual( + { + "index": { + "item-count": 1, + "created": 1 + }, + }, result["ops"]) + self.assertEqual( + [ + { + "item-count": 1, + "shards": { + "total": 2, + "successful": 1, + "failed": 0 + } + } + ], result["shards_histogram"]) + self.assertEqual(93, result["bulk-request-size-bytes"]) + self.assertEqual(39, result["total-document-size-bytes"]) + + es.bulk.assert_called_with(body=bulk_params["body"], params={}) + + es.bulk.return_value.pop("ingest_took") + result = bulk(es, bulk_params) + self.assertNotIn("ingest_took", result) + + @mock.patch("elasticsearch.Elasticsearch") + def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, es): + es.bulk.return_value = { + "took": 30, + "ingest_took": 20, + "errors": False, + "items": [ + { + "index": { + "_index": "test", + "_type": "type1", + "_id": "1", + "_version": 1, + "result": "created", + "_shards": { + "total": 2, + "successful": 1, + "failed": 0 + }, + "created": True, + "status": 201, + "_seq_no": 0 + } + } + ] + } + bulk = runner.BulkIndex() + + bulk_params = { + "body": { + "items": '{ "index" : { "_index" : "test", "_type" : "type1" } }\n' + + '{"location" : [-0.1485188, 51.5250666]}\n', + }, + "action-metadata-present": True, + "bulk-size": 1, + "detailed-results": True, + "index": "test" + } + + with self.assertRaisesRegex(exceptions.DataError, "bulk body is neither string nor list"): + bulk(es, bulk_params) + + es.bulk.assert_called_with(body=bulk_params["body"], params={}) + class ForceMergeRunnerTests(TestCase): @mock.patch("elasticsearch.Elasticsearch") diff --git a/tests/track/params_test.py b/tests/track/params_test.py index 46d8a0adf..7ad57600d 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -61,8 +61,9 @@ def test_slice_with_source_larger_than_slice(self): '{"key": "value10"}' ] - source.open(data, "r") - self.assertEqual(data[2:7], list(source)) + source.open(data, "r", 5) + # lines are returned as a list so we have to wrap our data once more + self.assertEqual([data[2:7]], list(source)) source.close() def test_slice_with_slice_larger_than_source(self): @@ -73,8 +74,9 @@ def test_slice_with_slice_larger_than_source(self): '{"key": "value3"}', ] - source.open(data, "r") - self.assertEqual(data, list(source)) + source.open(data, "r", 5) + # lines are returned as a list so we have to wrap our data once more + self.assertEqual([data], list(source)) source.close() @@ -142,19 +144,19 @@ def test_random_conflicts(self): class ActionMetaDataTests(TestCase): def test_generate_action_meta_data_without_id_conflicts(self): - self.assertEqual(("index", '{"index": {"_index": "test_index", "_type": "test_type"}}'), + self.assertEqual(("index", '{"index": {"_index": "test_index", "_type": "test_type"}}\n'), next(params.GenerateActionMetaData("test_index", "test_type"))) def test_generate_action_meta_data_typeless(self): - self.assertEqual(("index", '{"index": {"_index": "test_index"}}'), + self.assertEqual(("index", '{"index": {"_index": "test_index"}}\n'), next(params.GenerateActionMetaData("test_index", type_name=None))) def test_generate_action_meta_data_with_id_conflicts(self): def idx(id): - return "index", '{"index": {"_index": "test_index", "_type": "test_type", "_id": "%s"}}' % id + return "index", '{"index": {"_index": "test_index", "_type": "test_type", "_id": "%s"}}\n' % id def conflict(action, id): - return action, '{"%s": {"_index": "test_index", "_type": "test_type", "_id": "%s"}}' % (action, id) + return action, '{"%s": {"_index": "test_index", "_type": "test_type", "_id": "%s"}}\n' % (action, id) pseudo_random_conflicts = iter([ # if this value is <= our chosen threshold of 0.25 (see conflict_probability) we produce a conflict. @@ -197,15 +199,15 @@ def conflict(action, id): def test_generate_action_meta_data_with_id_conflicts_and_recency_bias(self): def idx(type_name, id): if type_name: - return "index", '{"index": {"_index": "test_index", "_type": "%s", "_id": "%s"}}' % (type_name, id) + return "index", '{"index": {"_index": "test_index", "_type": "%s", "_id": "%s"}}\n' % (type_name, id) else: - return "index", '{"index": {"_index": "test_index", "_id": "%s"}}' % id + return "index", '{"index": {"_index": "test_index", "_id": "%s"}}\n' % id def conflict(action, type_name, id): if type_name: - return action, '{"%s": {"_index": "test_index", "_type": "%s", "_id": "%s"}}' % (action, type_name, id) + return action, '{"%s": {"_index": "test_index", "_type": "%s", "_id": "%s"}}\n' % (action, type_name, id) else: - return action, '{"%s": {"_index": "test_index", "_id": "%s"}}' % (action, id) + return action, '{"%s": {"_index": "test_index", "_id": "%s"}}\n' % (action, id) pseudo_random_conflicts = iter([ # if this value is <= our chosen threshold of 0.25 (see conflict_probability) we produce a conflict. @@ -270,7 +272,7 @@ def conflict(action, type_name, id): def test_generate_action_meta_data_with_id_and_zero_conflict_probability(self): def idx(id): - return "index", '{"index": {"_index": "test_index", "_type": "test_type", "_id": "%s"}}' % id + return "index", '{"index": {"_index": "test_index", "_type": "test_type", "_id": "%s"}}\n' % id test_ids = [100, 200, 300, 400] @@ -280,39 +282,28 @@ def idx(id): self.assertListEqual([idx(id) for id in test_ids], list(generator)) - def test_source_file_action_meta_data(self): - source = params.Slice(io.StringAsFileSource, 0, 5) - generator = params.SourceActionMetaData(source) - - data = [ - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "1"}}', - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "2"}}', - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "3"}}', - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "4"}}', - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "5"}}', - ] - - source.open(data, "r") - self.assertEqual([("source", doc) for doc in data], list(generator)) - source.close() - class IndexDataReaderTests(TestCase): def test_read_bulk_larger_than_number_of_docs(self): data = [ - '{"key": "value1"}', - '{"key": "value2"}', - '{"key": "value3"}', - '{"key": "value4"}', - '{"key": "value5"}' + '{"key": "value1"}\n', + '{"key": "value2"}\n', + '{"key": "value3"}\n', + '{"key": "value4"}\n', + '{"key": "value5"}\n' ] bulk_size = 50 source = params.Slice(io.StringAsFileSource, 0, len(data)) am_handler = params.GenerateActionMetaData("test_index", "test_type") - reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler, - index_name="test_index", type_name="test_type") + reader = params.MetadataIndexDataReader(data, + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + action_metadata=am_handler, + index_name="test_index", + type_name="test_type") expected_bulk_sizes = [len(data)] # lines should include meta-data @@ -321,19 +312,24 @@ def test_read_bulk_larger_than_number_of_docs(self): def test_read_bulk_with_offset(self): data = [ - '{"key": "value1"}', - '{"key": "value2"}', - '{"key": "value3"}', - '{"key": "value4"}', - '{"key": "value5"}' + '{"key": "value1"}\n', + '{"key": "value2"}\n', + '{"key": "value3"}\n', + '{"key": "value4"}\n', + '{"key": "value5"}\n' ] bulk_size = 50 source = params.Slice(io.StringAsFileSource, 3, len(data)) am_handler = params.GenerateActionMetaData("test_index", "test_type") - reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler, - index_name="test_index", type_name="test_type") + reader = params.MetadataIndexDataReader(data, + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + action_metadata=am_handler, + index_name="test_index", + type_name="test_type") expected_bulk_sizes = [(len(data) - 3)] # lines should include meta-data @@ -342,21 +338,26 @@ def test_read_bulk_with_offset(self): def test_read_bulk_smaller_than_number_of_docs(self): data = [ - '{"key": "value1"}', - '{"key": "value2"}', - '{"key": "value3"}', - '{"key": "value4"}', - '{"key": "value5"}', - '{"key": "value6"}', - '{"key": "value7"}', + '{"key": "value1"}\n', + '{"key": "value2"}\n', + '{"key": "value3"}\n', + '{"key": "value4"}\n', + '{"key": "value5"}\n', + '{"key": "value6"}\n', + '{"key": "value7"}\n', ] bulk_size = 3 source = params.Slice(io.StringAsFileSource, 0, len(data)) am_handler = params.GenerateActionMetaData("test_index", "test_type") - reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler, - index_name="test_index", type_name="test_type") + reader = params.MetadataIndexDataReader(data, + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + action_metadata=am_handler, + index_name="test_index", + type_name="test_type") expected_bulk_sizes = [3, 3, 1] # lines should include meta-data @@ -365,13 +366,13 @@ def test_read_bulk_smaller_than_number_of_docs(self): def test_read_bulk_smaller_than_number_of_docs_and_multiple_clients(self): data = [ - '{"key": "value1"}', - '{"key": "value2"}', - '{"key": "value3"}', - '{"key": "value4"}', - '{"key": "value5"}', - '{"key": "value6"}', - '{"key": "value7"}', + '{"key": "value1"}\n', + '{"key": "value2"}\n', + '{"key": "value3"}\n', + '{"key": "value4"}\n', + '{"key": "value5"}\n', + '{"key": "value6"}\n', + '{"key": "value7"}\n', ] bulk_size = 3 @@ -379,8 +380,13 @@ def test_read_bulk_smaller_than_number_of_docs_and_multiple_clients(self): source = params.Slice(io.StringAsFileSource, 0, 5) am_handler = params.GenerateActionMetaData("test_index", "test_type") - reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler, - index_name="test_index", type_name="test_type") + reader = params.MetadataIndexDataReader(data, + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + action_metadata=am_handler, + index_name="test_index", + type_name="test_type") expected_bulk_sizes = [3, 2] # lines should include meta-data @@ -389,28 +395,31 @@ def test_read_bulk_smaller_than_number_of_docs_and_multiple_clients(self): def test_read_bulks_and_assume_metadata_line_in_source_file(self): data = [ - '{"index": {"_index": "test_index", "_type": "test_type"}', - '{"key": "value1"}', - '{"index": {"_index": "test_index", "_type": "test_type"}', - '{"key": "value2"}', - '{"index": {"_index": "test_index", "_type": "test_type"}', - '{"key": "value3"}', - '{"index": {"_index": "test_index", "_type": "test_type"}', - '{"key": "value4"}', - '{"index": {"_index": "test_index", "_type": "test_type"}', - '{"key": "value5"}', - '{"index": {"_index": "test_index", "_type": "test_type"}', - '{"key": "value6"}', - '{"index": {"_index": "test_index", "_type": "test_type"}', - '{"key": "value7"}' + '{"index": {"_index": "test_index", "_type": "test_type"}\n', + '{"key": "value1"}\n', + '{"index": {"_index": "test_index", "_type": "test_type"}\n', + '{"key": "value2"}\n', + '{"index": {"_index": "test_index", "_type": "test_type"}\n', + '{"key": "value3"}\n', + '{"index": {"_index": "test_index", "_type": "test_type"}\n', + '{"key": "value4"}\n', + '{"index": {"_index": "test_index", "_type": "test_type"}\n', + '{"key": "value5"}\n', + '{"index": {"_index": "test_index", "_type": "test_type"}\n', + '{"key": "value6"}\n', + '{"index": {"_index": "test_index", "_type": "test_type"}\n', + '{"key": "value7"}\n' ] bulk_size = 3 source = params.Slice(io.StringAsFileSource, 0, len(data)) - am_handler = params.SourceActionMetaData(source) - reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler, - index_name="test_index", type_name="test_type") + reader = params.SourceOnlyIndexDataReader(data, + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + index_name="test_index", + type_name="test_type") expected_bulk_sizes = [3, 3, 1] # lines should include meta-data @@ -434,11 +443,11 @@ def test_read_bulk_with_id_conflicts(self): 2]) data = [ - '{"key": "value1"}', - '{"key": "value2"}', - '{"key": "value3"}', - '{"key": "value4"}', - '{"key": "value5"}' + '{"key": "value1"}\n', + '{"key": "value2"}\n', + '{"key": "value3"}\n', + '{"key": "value4"}\n', + '{"key": "value5"}\n' ] bulk_size = 2 @@ -450,8 +459,13 @@ def test_read_bulk_with_id_conflicts(self): rand=lambda: next(pseudo_random_conflicts), randint=lambda x, y: next(chosen_index_of_conflicting_ids)) - reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler, - index_name="test_index", type_name="test_type") + reader = params.MetadataIndexDataReader(data, + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + action_metadata=am_handler, + index_name="test_index", + type_name="test_type") # consume all bulks bulks = [] @@ -461,31 +475,24 @@ def test_read_bulk_with_id_conflicts(self): bulks.append(bulk) self.assertEqual([ - [ - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}', - '{"key": "value1"}', - '{"update": {"_index": "test_index", "_type": "test_type", "_id": "200"}}', - '{"doc":{"key": "value2"}}' - ], - [ - '{"update": {"_index": "test_index", "_type": "test_type", "_id": "400"}}', - '{"doc":{"key": "value3"}}', - '{"update": {"_index": "test_index", "_type": "test_type", "_id": "300"}}', - '{"doc":{"key": "value4"}}' - ], - [ - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}', - '{"key": "value5"}' - ] - + '{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}\n' + + '{"key": "value1"}\n' + + '{"update": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + + '{"doc":{"key": "value2"}}\n', + '{"update": {"_index": "test_index", "_type": "test_type", "_id": "400"}}\n' + + '{"doc":{"key": "value3"}}\n' + + '{"update": {"_index": "test_index", "_type": "test_type", "_id": "300"}}\n' + + '{"doc":{"key": "value4"}}\n', + '{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + + '{"key": "value5"}\n' ], bulks) def test_read_bulk_with_external_id_and_zero_conflict_probability(self): data = [ - '{"key": "value1"}', - '{"key": "value2"}', - '{"key": "value3"}', - '{"key": "value4"}' + '{"key": "value1"}\n', + '{"key": "value2"}\n', + '{"key": "value3"}\n', + '{"key": "value4"}\n' ] bulk_size = 2 @@ -494,8 +501,13 @@ def test_read_bulk_with_external_id_and_zero_conflict_probability(self): conflicting_ids=[100, 200, 300, 400], conflict_probability=0) - reader = params.IndexDataReader(data, batch_size=bulk_size, bulk_size=bulk_size, file_source=source, action_metadata=am_handler, - index_name="test_index", type_name="test_type") + reader = params.MetadataIndexDataReader(data, + batch_size=bulk_size, + bulk_size=bulk_size, + file_source=source, + action_metadata=am_handler, + index_name="test_index", + type_name="test_type") # consume all bulks bulks = [] @@ -505,28 +517,27 @@ def test_read_bulk_with_external_id_and_zero_conflict_probability(self): bulks.append(bulk) self.assertEqual([ - [ - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}', - '{"key": "value1"}', - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}', - '{"key": "value2"}' - ], - [ - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "300"}}', - '{"key": "value3"}', - '{"index": {"_index": "test_index", "_type": "test_type", "_id": "400"}}', - '{"key": "value4"}' - ] + '{"index": {"_index": "test_index", "_type": "test_type", "_id": "100"}}\n' + + '{"key": "value1"}\n' + + '{"index": {"_index": "test_index", "_type": "test_type", "_id": "200"}}\n' + + '{"key": "value2"}\n', + + '{"index": {"_index": "test_index", "_type": "test_type", "_id": "300"}}\n' + + '{"key": "value3"}\n' + + '{"index": {"_index": "test_index", "_type": "test_type", "_id": "400"}}\n' + + '{"key": "value4"}\n' ], bulks) def assert_bulks_sized(self, reader, expected_bulk_sizes, expected_line_sizes): + self.assertEqual(len(expected_bulk_sizes), len(expected_line_sizes), "Bulk sizes and line sizes must be equal") with reader: bulk_index = 0 for index, type, batch in reader: for bulk_size, bulk in batch: - self.assertEqual(expected_bulk_sizes[bulk_index], bulk_size) - self.assertEqual(expected_line_sizes[bulk_index], len(bulk)) + self.assertEqual(expected_bulk_sizes[bulk_index], bulk_size, msg="bulk size") + self.assertEqual(expected_line_sizes[bulk_index], bulk.count("\n")) bulk_index += 1 + self.assertEqual(len(expected_bulk_sizes), bulk_index, "Not all bulk sizes have been checked") class InvocationGeneratorTests(TestCase):