diff --git a/it/tracker_test.py b/it/tracker_test.py index cb407a5e1..ba5b35773 100644 --- a/it/tracker_test.py +++ b/it/tracker_test.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -38,7 +38,12 @@ def test_cluster(): test_execution_id = str(uuid.uuid4()) it.wait_until_port_is_free(port_number=port) - cluster.install(distribution_version=dist, node_name="benchmark-node", provision_config_instance="4gheap", http_port=port) + cluster.install( + distribution_version=dist, + node_name="benchmark-node", + provision_config_instance="4gheap", + http_port=port, + ) cluster.start(test_execution_id=test_execution_id) yield cluster cluster.stop() @@ -47,23 +52,33 @@ def test_cluster(): @it.benchmark_in_mem def test_create_workload(cfg, tmp_path, test_cluster): # prepare some data - cmd = f"--test-mode --pipeline=benchmark-only --target-hosts=127.0.0.1:{test_cluster.http_port} " \ - f" --workload=geonames --test-procedure=append-no-conflicts-index-only --quiet" + cmd = ( + f"--test-mode --pipeline=benchmark-only --target-hosts=127.0.0.1:{test_cluster.http_port} " + f" --workload=geonames --test-procedure=append-no-conflicts-index-only --quiet" + ) assert it.execute_test(cfg, cmd) == 0 # create the workload workload_name = f"test-workload-{uuid.uuid4()}" workload_path = tmp_path / workload_name - assert it.osbenchmark(cfg, f"create-workload --target-hosts=127.0.0.1:{test_cluster.http_port} --indices=geonames " - f"--workload={workload_name} --output-path={tmp_path}") == 0 + assert ( + it.osbenchmark( + cfg, + f"create-workload --target-hosts=127.0.0.1:{test_cluster.http_port} --indices=geonames " + f"--workload={workload_name} --output-path={tmp_path}", + ) + == 0 + ) - expected_files = ["workload.json", - "geonames.json", - "geonames-documents-1k.json", - "geonames-documents.json", - "geonames-documents-1k.json.bz2", - "geonames-documents.json.bz2"] + expected_files = [ + "workload.json", + "geonames.json", + "geonames-documents-1k.json", + "geonames-documents.json", + "geonames-documents-1k.json.bz2", + "geonames-documents.json.bz2", + ] for f in expected_files: full_path = workload_path / f diff --git a/osbenchmark/workload_generator/corpus.py b/osbenchmark/workload_generator/corpus.py index cd4f46bac..3fbb9bca3 100644 --- a/osbenchmark/workload_generator/corpus.py +++ b/osbenchmark/workload_generator/corpus.py @@ -126,8 +126,8 @@ def dump_documents_range( start_doc, end_doc, total_docs, - bsize=None, - custom_dump_query=None, + bsize=0, + custom_dump_query='{"match_all": {}}', ): """ Extract documents in the range of start_doc and end_doc and write to individual files @@ -151,14 +151,6 @@ def dump_documents_range( with open(comp_outpath, "wb") as comp_outfile: max_doc = total_docs if end_doc > total_docs else end_doc - logger.info( - "Dumping corpus for index [%s] to [%s] for docs %s-%s.", - index, - out_path, - start_doc, - max_doc, - ) - batch_size = bsize if bsize > 0 else (max_doc - start_doc) // 5 if batch_size < 1: batch_size = 1 @@ -175,7 +167,6 @@ def dump_documents_range( } else: query = { - # {"match_all": {}} "query": custom_dump_query, "size": batch_size, "sort": [{"_id": "asc"}], @@ -192,7 +183,7 @@ def dump_documents_range( try: search_after = doc["sort"] except KeyError: - logger.info("%s", doc) + logger.info("Error in response format: %s", doc) data = ( json.dumps(doc["_source"], separators=(",", ":")) + "\n" ).encode("utf-8") @@ -207,6 +198,8 @@ def dump_documents_range( comp_outfile.write(compressor.flush()) + logger.info("Finished dumping corpus for index [%s] to [%s].", index, out_path) + def dump_documents( concurrent, @@ -270,12 +263,13 @@ def dump_documents( 0, number_of_docs, number_of_docs, - progress_message_suffix, ) merge_json_files(out_path, [(0, number_of_docs)]) def merge_json_files(out_path, ranges): + logger = logging.getLogger(__name__) + for EXT in [OUT_EXT, OUT_EXT + COMP_EXT]: merged_file_path = f"{out_path}" + EXT with open(merged_file_path, "wb") as merged_file: @@ -284,4 +278,9 @@ def merge_json_files(out_path, ranges): with open(file_path, "rb") as f: for line in f: merged_file.write(line) - os.remove(file_path) + try: + os.remove(file_path) + except: + pass + + logger.info("Finished merging shards into [%s].", merged_file_path) diff --git a/tests/workload_generator/corpus_test.py b/tests/workload_generator/corpus_test.py index 30f089260..81259f07d 100644 --- a/tests/workload_generator/corpus_test.py +++ b/tests/workload_generator/corpus_test.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -35,31 +35,23 @@ def serialize_doc(doc): @mock.patch("builtins.open", new_callable=mock.mock_open) @mock.patch("opensearchpy.OpenSearch") def test_extract(client, mo): - doc = { - "field1": "stuff", - "field2": "things" - } + doc = {"field1": "stuff", "field2": "things"} doc_data = serialize_doc(doc) - client.count.return_value = { - "count": 1001 - } + client.count.return_value = {"count": 1001} client.search.return_value = { "_scroll_id": "uohialjrknf", - "_shards": { - "successful": 1, - "total": 1, - "skipped": 0 - }, + "_shards": {"successful": 1, "total": 1, "skipped": 0}, "hits": { "hits": [ { "_index": "test", "_id": "0", "_score": 0, - "_source": doc + "_source": doc, + "sort": [0], } ] - } + }, } def set_corp_size(*args, **kwargs): @@ -79,12 +71,16 @@ def set_corp_size(*args, **kwargs): with mock.patch("os.stat") as osstat: osstat.side_effect = set_corp_size res = corpus.extract(client, outdir, index) - assert mo.call_count == 4 - mo.assert_has_calls([call("/abs/outpath/to/workloads/test-documents.json", "wb"), - call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), - call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), - call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb") - ], any_order=True) + assert mo.call_count == 12 + mo.assert_has_calls( + [ + call("/abs/outpath/to/workloads/test-documents.json", "wb"), + call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb"), + ], + any_order=True, + ) assert res == { "filename": "test-documents.json.bz2", @@ -92,7 +88,72 @@ def set_corp_size(*args, **kwargs): "compressed_bytes": 500, "index_name": "test", "doc_count": 1001, - "uncompressed_bytes": 1000 + "uncompressed_bytes": 1000, + } + + file_mock = mo.return_value + file_mock.assert_has_calls([call.write(doc_data)]) + + +@mock.patch("builtins.open", new_callable=mock.mock_open) +@mock.patch("opensearchpy.OpenSearch") +def test_extract_concurrent(client, mo): + doc = {"field1": "stuff", "field2": "things"} + doc_data = serialize_doc(doc) + client.count.return_value = {"count": 1501} + client.search.return_value = { + "_scroll_id": "uohialjrknf", + "_shards": {"successful": 1, "total": 1, "skipped": 0}, + "hits": { + "hits": [ + { + "_index": "test", + "_id": "0", + "_score": 0, + "_source": doc, + "sort": [0], + } + ] + }, + } + + def set_corp_size(*args, **kwargs): + path = args[0] + mockstat = mock.Mock() + if ".bz2" in path: + mockstat.st_size = 500 + else: + mockstat.st_size = 1000 + return mockstat + + client.scroll.return_value = {} + + index = "test" + outdir = "/abs/outpath/to/workloads/" + + with mock.patch("os.stat") as osstat: + osstat.side_effect = set_corp_size + res = corpus.extract( + client, outdir, index, concurrent=True, threads=4, bsize=100 + ) + assert mo.call_count == 40 + mo.assert_has_calls( + [ + call("/abs/outpath/to/workloads/test-documents.json", "wb"), + call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"), + call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb"), + ], + any_order=True, + ) + + assert res == { + "filename": "test-documents.json.bz2", + "path": "/abs/outpath/to/workloads/test-documents.json.bz2", + "compressed_bytes": 500, + "index_name": "test", + "doc_count": 1501, + "uncompressed_bytes": 1000, } file_mock = mo.return_value