Skip to content

Commit

Permalink
Adding unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: AkshathRaghav <[email protected]>
  • Loading branch information
AkshathRaghav committed Nov 8, 2023
1 parent b79c303 commit b6ea5ca
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 48 deletions.
39 changes: 27 additions & 12 deletions it/tracker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
Expand All @@ -38,7 +38,12 @@ def test_cluster():
test_execution_id = str(uuid.uuid4())

it.wait_until_port_is_free(port_number=port)
cluster.install(distribution_version=dist, node_name="benchmark-node", provision_config_instance="4gheap", http_port=port)
cluster.install(
distribution_version=dist,
node_name="benchmark-node",
provision_config_instance="4gheap",
http_port=port,
)
cluster.start(test_execution_id=test_execution_id)
yield cluster
cluster.stop()
Expand All @@ -47,23 +52,33 @@ def test_cluster():
@it.benchmark_in_mem
def test_create_workload(cfg, tmp_path, test_cluster):
# prepare some data
cmd = f"--test-mode --pipeline=benchmark-only --target-hosts=127.0.0.1:{test_cluster.http_port} " \
f" --workload=geonames --test-procedure=append-no-conflicts-index-only --quiet"
cmd = (
f"--test-mode --pipeline=benchmark-only --target-hosts=127.0.0.1:{test_cluster.http_port} "
f" --workload=geonames --test-procedure=append-no-conflicts-index-only --quiet"
)
assert it.execute_test(cfg, cmd) == 0

# create the workload
workload_name = f"test-workload-{uuid.uuid4()}"
workload_path = tmp_path / workload_name

assert it.osbenchmark(cfg, f"create-workload --target-hosts=127.0.0.1:{test_cluster.http_port} --indices=geonames "
f"--workload={workload_name} --output-path={tmp_path}") == 0
assert (
it.osbenchmark(
cfg,
f"create-workload --target-hosts=127.0.0.1:{test_cluster.http_port} --indices=geonames "
f"--workload={workload_name} --output-path={tmp_path}",
)
== 0
)

expected_files = ["workload.json",
"geonames.json",
"geonames-documents-1k.json",
"geonames-documents.json",
"geonames-documents-1k.json.bz2",
"geonames-documents.json.bz2"]
expected_files = [
"workload.json",
"geonames.json",
"geonames-documents-1k.json",
"geonames-documents.json",
"geonames-documents-1k.json.bz2",
"geonames-documents.json.bz2",
]

for f in expected_files:
full_path = workload_path / f
Expand Down
27 changes: 13 additions & 14 deletions osbenchmark/workload_generator/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ def dump_documents_range(
start_doc,
end_doc,
total_docs,
bsize=None,
custom_dump_query=None,
bsize=0,
custom_dump_query='{"match_all": {}}',
):
"""
Extract documents in the range of start_doc and end_doc and write to individual files
Expand All @@ -151,14 +151,6 @@ def dump_documents_range(
with open(comp_outpath, "wb") as comp_outfile:
max_doc = total_docs if end_doc > total_docs else end_doc

logger.info(
"Dumping corpus for index [%s] to [%s] for docs %s-%s.",
index,
out_path,
start_doc,
max_doc,
)

batch_size = bsize if bsize > 0 else (max_doc - start_doc) // 5
if batch_size < 1:
batch_size = 1
Expand All @@ -175,7 +167,6 @@ def dump_documents_range(
}
else:
query = {
# {"match_all": {}}
"query": custom_dump_query,
"size": batch_size,
"sort": [{"_id": "asc"}],
Expand All @@ -192,7 +183,7 @@ def dump_documents_range(
try:
search_after = doc["sort"]
except KeyError:
logger.info("%s", doc)
logger.info("Error in response format: %s", doc)
data = (
json.dumps(doc["_source"], separators=(",", ":")) + "\n"
).encode("utf-8")
Expand All @@ -207,6 +198,8 @@ def dump_documents_range(

comp_outfile.write(compressor.flush())

logger.info("Finished dumping corpus for index [%s] to [%s].", index, out_path)


def dump_documents(
concurrent,
Expand Down Expand Up @@ -270,12 +263,13 @@ def dump_documents(
0,
number_of_docs,
number_of_docs,
progress_message_suffix,
)
merge_json_files(out_path, [(0, number_of_docs)])


def merge_json_files(out_path, ranges):
logger = logging.getLogger(__name__)

for EXT in [OUT_EXT, OUT_EXT + COMP_EXT]:
merged_file_path = f"{out_path}" + EXT
with open(merged_file_path, "wb") as merged_file:
Expand All @@ -284,4 +278,9 @@ def merge_json_files(out_path, ranges):
with open(file_path, "rb") as f:
for line in f:
merged_file.write(line)
os.remove(file_path)
try:
os.remove(file_path)
except:
pass

logger.info("Finished merging shards into [%s].", merged_file_path)
105 changes: 83 additions & 22 deletions tests/workload_generator/corpus_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
Expand All @@ -35,31 +35,23 @@ def serialize_doc(doc):
@mock.patch("builtins.open", new_callable=mock.mock_open)
@mock.patch("opensearchpy.OpenSearch")
def test_extract(client, mo):
doc = {
"field1": "stuff",
"field2": "things"
}
doc = {"field1": "stuff", "field2": "things"}
doc_data = serialize_doc(doc)
client.count.return_value = {
"count": 1001
}
client.count.return_value = {"count": 1001}
client.search.return_value = {
"_scroll_id": "uohialjrknf",
"_shards": {
"successful": 1,
"total": 1,
"skipped": 0
},
"_shards": {"successful": 1, "total": 1, "skipped": 0},
"hits": {
"hits": [
{
"_index": "test",
"_id": "0",
"_score": 0,
"_source": doc
"_source": doc,
"sort": [0],
}
]
}
},
}

def set_corp_size(*args, **kwargs):
Expand All @@ -79,20 +71,89 @@ def set_corp_size(*args, **kwargs):
with mock.patch("os.stat") as osstat:
osstat.side_effect = set_corp_size
res = corpus.extract(client, outdir, index)
assert mo.call_count == 4
mo.assert_has_calls([call("/abs/outpath/to/workloads/test-documents.json", "wb"),
call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"),
call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"),
call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb")
], any_order=True)
assert mo.call_count == 12
mo.assert_has_calls(
[
call("/abs/outpath/to/workloads/test-documents.json", "wb"),
call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"),
call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"),
call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb"),
],
any_order=True,
)

assert res == {
"filename": "test-documents.json.bz2",
"path": "/abs/outpath/to/workloads/test-documents.json.bz2",
"compressed_bytes": 500,
"index_name": "test",
"doc_count": 1001,
"uncompressed_bytes": 1000
"uncompressed_bytes": 1000,
}

file_mock = mo.return_value
file_mock.assert_has_calls([call.write(doc_data)])


@mock.patch("builtins.open", new_callable=mock.mock_open)
@mock.patch("opensearchpy.OpenSearch")
def test_extract_concurrent(client, mo):
doc = {"field1": "stuff", "field2": "things"}
doc_data = serialize_doc(doc)
client.count.return_value = {"count": 1501}
client.search.return_value = {
"_scroll_id": "uohialjrknf",
"_shards": {"successful": 1, "total": 1, "skipped": 0},
"hits": {
"hits": [
{
"_index": "test",
"_id": "0",
"_score": 0,
"_source": doc,
"sort": [0],
}
]
},
}

def set_corp_size(*args, **kwargs):
path = args[0]
mockstat = mock.Mock()
if ".bz2" in path:
mockstat.st_size = 500
else:
mockstat.st_size = 1000
return mockstat

client.scroll.return_value = {}

index = "test"
outdir = "/abs/outpath/to/workloads/"

with mock.patch("os.stat") as osstat:
osstat.side_effect = set_corp_size
res = corpus.extract(
client, outdir, index, concurrent=True, threads=4, bsize=100
)
assert mo.call_count == 40
mo.assert_has_calls(
[
call("/abs/outpath/to/workloads/test-documents.json", "wb"),
call("/abs/outpath/to/workloads/test-documents.json.bz2", "wb"),
call("/abs/outpath/to/workloads/test-documents-1k.json", "wb"),
call("/abs/outpath/to/workloads/test-documents-1k.json.bz2", "wb"),
],
any_order=True,
)

assert res == {
"filename": "test-documents.json.bz2",
"path": "/abs/outpath/to/workloads/test-documents.json.bz2",
"compressed_bytes": 500,
"index_name": "test",
"doc_count": 1501,
"uncompressed_bytes": 1000,
}

file_mock = mo.return_value
Expand Down

0 comments on commit b6ea5ca

Please sign in to comment.