Skip to content

Commit

Permalink
Allow unlimited number of pages for scroll queries
Browse files Browse the repository at this point in the history
Closes #223
  • Loading branch information
danielmitterdorfer committed May 9, 2017
1 parent 8df775f commit 84d6bf1
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ With the operation type ``search`` you can execute `request body searches <http:
* ``type`` (optional): Defines the type within the specified index for this query.
* ``cache`` (optional): Whether to use the query request cache. By default, Rally will define no value thus the default depends on the benchmark candidate settings and Elasticsearch version.
* ``body`` (mandatory): The query body.
* ``pages`` (optional): Number of pages to retrieve. If this parameter is present, a scroll query will be executed.
* ``pages`` (optional): Number of pages to retrieve. If this parameter is present, a scroll query will be executed. If you want to retrieve all result pages, use the value "all".
* ``results-per-page`` (optional): Number of documents to retrieve per page for scroll queries.

Example::
Expand Down
7 changes: 6 additions & 1 deletion esrally/driver/runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import types
import logging
from collections import Counter, OrderedDict
Expand Down Expand Up @@ -411,7 +412,11 @@ def scroll_query(self, es, params):
else:
# This should only happen if we concurrently create an index and start searching
self.scroll_id = None
total_pages = params["pages"]
if params["pages"] == "all":
total_pages = sys.maxsize
else:
# explicitly convert to int to provoke an error otherwise
total_pages = int(params["pages"])
# Note that starting with ES 2.0, the initial call to search() returns already the first result page
# so we have to retrieve one page less
for page in range(total_pages - 1):
Expand Down
224 changes: 223 additions & 1 deletion tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from esrally.driver import runner


class RegisterRunnerTest(TestCase):
class RegisterRunnerTests(TestCase):
def tearDown(self):
runner.remove_runner("unit_test")

Expand Down Expand Up @@ -459,3 +459,225 @@ def test_mixed_bulk_with_detailed_stats(self, es):
], result["shards_histogram"])

es.bulk.assert_called_with(body=bulk_params["body"], params={})


class QueryRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_query_match_all(self, es):
es.search.return_value = {
"hits": {
"hits": [
{
"some-doc-1"
}
]
}
}

query_runner = runner.Query()

params = {
"index": "unittest",
"type": "type",
"use_request_cache": False,
"body": {
"query": {
"match_all": {}
}
}
}

with query_runner:
num_ops, ops_unit = query_runner(es, params)

self.assertEqual(1, num_ops)
self.assertEqual("ops", ops_unit)

@mock.patch("elasticsearch.Elasticsearch")
def test_scroll_query_only_one_page(self, es):
# page 1
es.search.return_value = {
"_scroll_id": "some-scroll-id",
"hits": {
"hits": [
{
"some-doc-1"
}
]
}
}
es.transport.perform_request.side_effect = [
# delete scroll id response
{
"acknowledged": True
}
]

query_runner = runner.Query()

params = {
"pages": 1,
"items_per_page": 100,
"index": "unittest",
"type": "type",
"use_request_cache": False,
"body": {
"query": {
"match_all": {}
}
}
}

with query_runner:
num_ops, ops_unit = query_runner(es, params)

self.assertEqual(1, num_ops)
self.assertEqual("ops", ops_unit)

@mock.patch("elasticsearch.Elasticsearch")
def test_scroll_query_with_explicit_number_of_pages(self, es):
# page 1
es.search.return_value = {
"_scroll_id": "some-scroll-id",
"hits": {
"hits": [
{
"some-doc-1"
}
]
}
}
es.transport.perform_request.side_effect = [
# page 2
{
"_scroll_id": "some-scroll-id",
"hits": {
"hits": [
{
"some-doc-2"
}
]
}
},
# delete scroll id response
{
"acknowledged": True
}
]

query_runner = runner.Query()

params = {
"pages": 2,
"items_per_page": 100,
"index": "unittest",
"type": "type",
"use_request_cache": False,
"body": {
"query": {
"match_all": {}
}
}
}

with query_runner:
num_ops, ops_unit = query_runner(es, params)

self.assertEqual(2, num_ops)
self.assertEqual("ops", ops_unit)

@mock.patch("elasticsearch.Elasticsearch")
def test_scroll_query_early_termination(self, es):
# page 1
es.search.return_value = {
"_scroll_id": "some-scroll-id",
"hits": {
"hits": [
{
"some-doc-1"
}
]
}
}
es.transport.perform_request.side_effect = [
# page 2 has no results
{
"_scroll_id": "some-scroll-id",
"hits": {
"hits": []
}
},
# delete scroll id response
{
"acknowledged": True
}
]

query_runner = runner.Query()

params = {
"pages": 5,
"items_per_page": 100,
"index": "unittest",
"type": "type",
"use_request_cache": False,
"body": {
"query": {
"match_all": {}
}
}
}

with query_runner:
num_ops, ops_unit = query_runner(es, params)

self.assertEqual(2, num_ops)
self.assertEqual("ops", ops_unit)

@mock.patch("elasticsearch.Elasticsearch")
def test_scroll_query_request_all_pages(self, es):
# page 1
es.search.return_value = {
"_scroll_id": "some-scroll-id",
"hits": {
"hits": [
{
"some-doc-1"
}
]
}
}
es.transport.perform_request.side_effect = [
# page 2 has no results
{
"_scroll_id": "some-scroll-id",
"hits": {
"hits": []
}
},
# delete scroll id response
{
"acknowledged": True
}
]

query_runner = runner.Query()

params = {
"pages": "all",
"items_per_page": 100,
"index": "unittest",
"type": "type",
"use_request_cache": False,
"body": {
"query": {
"match_all": {}
}
}
}

with query_runner:
num_ops, ops_unit = query_runner(es, params)

self.assertEqual(2, num_ops)
self.assertEqual("ops", ops_unit)

0 comments on commit 84d6bf1

Please sign in to comment.