Skip to content

Commit

Permalink
Speed up client-side bulk-handling (#890)
Browse files Browse the repository at this point in the history
With this commit we speed up preparing bulk requests from data files by
implementing several optimizations:

* Bulk data are passed as a string instead of a list to the runner. This
avoids the cost of converting the raw list to a string in the Python
Elasticsearch client.
* Lines are read in bulk from the data source instead of line by line.
This avoids many method calls.
* We provide a special implementation for the common case (ids are
autogenerated, no conflicts) to make the hot code path as simple as
possible.

This commit also adds a microbenchmark that measures the speedup. The
following table shows a comparison of the throughput of the bulk reader
for various bulk sizes:

| Bulk Size | master [ops/s] | This PR [ops/s] | Speedup |
|-----------|----------------|-----------------|---------|
| 100       | 14829          | 92395           | 6.23    |
| 1000      | 1448           | 10953           | 7.56    |
| 10000     | 148            | 1100            | 7.43    |
| 100000    | 15             | 107             | 7.13    |

All data have been measured using Python 3.8 on Linux.
  • Loading branch information
danielmitterdorfer authored Feb 12, 2020
1 parent ddcf7db commit 4044259
Show file tree
Hide file tree
Showing 6 changed files with 554 additions and 241 deletions.
130 changes: 130 additions & 0 deletions benchmarks/track/bulk_params_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import sys

import pytest

from esrally.track import params


class StaticSource:
def __init__(self, contents, mode, encoding="utf-8"):
self.contents = '{"geonameid": 2986043, "name": "Pic de Font Blanca", "asciiname": "Pic de Font Blanca"}'
self.current_index = 0
self.opened = False

def open(self):
self.opened = True
return self

def seek(self, offset):
pass

def read(self):
return "\n".join(self.contents)

def readline(self):
return self.contents

def readlines(self, num_lines):
return [self.contents] * num_lines

def close(self):
self._assert_opened()
self.contents = None
self.opened = False

def _assert_opened(self):
assert self.opened

def __enter__(self):
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False

def __str__(self, *args, **kwargs):
return "StaticSource"


def create_reader(bulk_size):
metadata = params.GenerateActionMetaData(index_name="test-idx", type_name=None)

source = params.Slice(StaticSource, 0, sys.maxsize)
reader = params.MetadataIndexDataReader(data_file="bogus",
batch_size=bulk_size,
bulk_size=bulk_size,
file_source=source,
action_metadata=metadata,
index_name="test-idx",
type_name=None)
return reader


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_100(benchmark):
reader = create_reader(bulk_size=100)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_1000(benchmark):
reader = create_reader(bulk_size=1000)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_10000(benchmark):
reader = create_reader(bulk_size=10000)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_100000(benchmark):
reader = create_reader(bulk_size=100000)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)
9 changes: 8 additions & 1 deletion esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,14 @@ def detailed_stats(self, params, bulk_size, response):
total_document_size_bytes = 0
with_action_metadata = mandatory(params, "action-metadata-present", self)

for line_number, data in enumerate(params["body"]):
if isinstance(params["body"], str):
bulk_lines = params["body"].split("\n")
elif isinstance(params["body"], list):
bulk_lines = params["body"]
else:
raise exceptions.DataError("bulk body is neither string nor list")

for line_number, data in enumerate(bulk_lines):
line_size = len(data.encode('utf-8'))
if with_action_metadata:
if line_number % 2 == 1:
Expand Down
145 changes: 95 additions & 50 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def params(self):
# self.internal_params always reads all files. This is necessary to ensure we terminate early in case
# the user has specified ingest percentage.
if self.current_bulk == self.total_bulks:
raise StopIteration
raise StopIteration()
self.current_bulk += 1
return next(self.internal_params)

Expand Down Expand Up @@ -647,13 +647,12 @@ def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_si
source = Slice(io.FileSource, offset, num_lines)

if docs.includes_action_and_meta_data:
am_handler = SourceActionMetaData(source)
return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, docs.target_index, docs.target_type)
else:
am_handler = GenerateActionMetaData(docs.target_index, docs.target_type,
build_conflicting_ids(id_conflicts, num_docs, offset), conflict_probability,
on_conflict, recency)

return IndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type)
return MetadataIndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type)


def create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency,
Expand Down Expand Up @@ -758,15 +757,15 @@ class GenerateActionMetaData:
def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probability=None, on_conflict=None,
recency=None, rand=random.random, randint=random.randint, randexp=random.expovariate):
if type_name:
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \
(index_name, type_name, "%s")
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \
(index_name, type_name, "%s")
self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}' % (index_name, type_name)
self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}\n' % (index_name, type_name)
else:
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s")
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s")
self.meta_data_index_no_id = '{"index": {"_index": "%s"}}' % index_name
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s")
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s")
self.meta_data_index_no_id = '{"index": {"_index": "%s"}}\n' % index_name

self.conflicting_ids = conflicting_ids
self.on_conflict = on_conflict
Expand All @@ -779,6 +778,13 @@ def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probabi
self.randexp = randexp
self.id_up_to = 0

@property
def is_constant(self):
"""
:return: True iff the iterator will always return the same value.
"""
return self.conflicting_ids is None

def __iter__(self):
return self

Expand Down Expand Up @@ -818,34 +824,25 @@ def __next__(self):
return "index", self.meta_data_index_no_id


class SourceActionMetaData:
def __init__(self, source):
self.source = source

def __iter__(self):
return self

def __next__(self):
return "source", next(self.source)


class Slice:
def __init__(self, source_class, offset, number_of_lines):
self.source_class = source_class
self.source = None
self.offset = offset
self.number_of_lines = number_of_lines
self.current_line = 0
self.bulk_size = None
self.logger = logging.getLogger(__name__)

def open(self, file_name, mode):
logger = logging.getLogger(__name__)
def open(self, file_name, mode, bulk_size):
self.bulk_size = bulk_size
self.source = self.source_class(file_name, mode).open()
# skip offset number of lines
logger.info("Skipping %d lines in [%s].", self.offset, file_name)
self.logger.info("Will read [%d] lines from [%s] starting from line [%d] with bulk size [%d].",
self.number_of_lines, file_name, self.offset, self.bulk_size)
start = time.perf_counter()
io.skip_lines(file_name, self.source, self.offset)
end = time.perf_counter()
logger.info("Skipping %d lines took %f s.", self.offset, end - start)
self.logger.debug("Skipping [%d] lines took [%f] s.", self.offset, end - start)
return self

def close(self):
Expand All @@ -859,11 +856,12 @@ def __next__(self):
if self.current_line >= self.number_of_lines:
raise StopIteration()
else:
self.current_line += 1
line = self.source.readline()
if len(line) == 0:
# ensure we don't read past the allowed number of lines.
lines = self.source.readlines(min(self.bulk_size, self.number_of_lines - self.current_line))
self.current_line += len(lines)
if len(lines) == 0:
raise StopIteration()
return line.strip()
return lines

def __str__(self):
return "%s[%d;%d]" % (self.source, self.offset, self.offset + self.number_of_lines)
Expand All @@ -873,21 +871,20 @@ class IndexDataReader:
"""
Reads a file in bulks into an array and also adds a meta-data line before each document if necessary.
This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N is any natural
number >= 1. This makes file reading more efficient for small bulk sizes.
This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N
is any natural number >= 1. This makes file reading more efficient for small bulk sizes.
"""

def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name):
def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name):
self.data_file = data_file
self.batch_size = batch_size
self.bulk_size = bulk_size
self.file_source = file_source
self.action_metadata = action_metadata
self.index_name = index_name
self.type_name = type_name

def __enter__(self):
self.file_source.open(self.data_file, 'rt')
self.file_source.open(self.data_file, "rt", self.bulk_size)
return self

def __iter__(self):
Expand All @@ -901,38 +898,86 @@ def __next__(self):
try:
docs_in_batch = 0
while docs_in_batch < self.batch_size:
docs_in_bulk, bulk = self.read_bulk()
try:
docs_in_bulk, bulk = self.read_bulk()
except StopIteration:
break
if docs_in_bulk == 0:
break
docs_in_batch += docs_in_bulk
batch.append((docs_in_bulk, bulk))
batch.append((docs_in_bulk, "".join(bulk)))
if docs_in_batch == 0:
raise StopIteration()
return self.index_name, self.type_name, batch
except IOError:
logging.getLogger(__name__).exception("Could not read [%s]", self.data_file)

def read_bulk(self):
docs_in_bulk = 0
def __exit__(self, exc_type, exc_val, exc_tb):
self.file_source.close()
return False


class MetadataIndexDataReader(IndexDataReader):
def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name):
super().__init__(data_file, batch_size, bulk_size, file_source, index_name, type_name)
self.action_metadata = action_metadata
self.action_metadata_line = None

def __enter__(self):
super().__enter__()
if self.action_metadata.is_constant:
_, self.action_metadata_line = next(self.action_metadata)
self.read_bulk = self._read_bulk_fast
else:
self.read_bulk = self._read_bulk_regular
return self

def _read_bulk_fast(self):
"""
Special-case implementation for bulk data files where the action and meta-data line is always identical.
"""
current_bulk = []
for action_metadata_item, document in zip(self.action_metadata, self.file_source):
# hoist
action_metadata_line = self.action_metadata_line
docs = next(self.file_source)

for doc in docs:
current_bulk.append(action_metadata_line)
current_bulk.append(doc)
return len(docs), current_bulk

def _read_bulk_regular(self):
"""
General case implementation for bulk files. This implementation can cover all cases but is slower when the
action and meta-data line is always identical.
"""
current_bulk = []
docs = next(self.file_source)
for doc in docs:
action_metadata_item = next(self.action_metadata)
if action_metadata_item:
action_type, action_metadata_line = action_metadata_item
current_bulk.append(action_metadata_line)
if action_type == "update":
current_bulk.append("{\"doc\":%s}" % document)
# remove the trailing "\n" as the doc needs to fit on one line
doc = doc.strip()
current_bulk.append("{\"doc\":%s}\n" % doc)
else:
current_bulk.append(document)
current_bulk.append(doc)
else:
current_bulk.append(document)
docs_in_bulk += 1
if docs_in_bulk == self.bulk_size:
break
return docs_in_bulk, current_bulk
current_bulk.append(doc)
return len(docs), current_bulk

def __exit__(self, exc_type, exc_val, exc_tb):
self.file_source.close()
return False

class SourceOnlyIndexDataReader(IndexDataReader):
def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name):
# keep batch size as it only considers documents read, not lines read but increase the bulk size as
# documents are only on every other line.
super().__init__(data_file, batch_size, bulk_size * 2, file_source, index_name, type_name)

def read_bulk(self):
bulk_items = next(self.file_source)
return len(bulk_items) // 2, bulk_items


register_param_source_for_operation(track.OperationType.Bulk, BulkIndexParamSource)
Expand Down
Loading

0 comments on commit 4044259

Please sign in to comment.