Skip to content

Commit

Permalink
Download pre-generated offset files only when needed (#573)
Browse files Browse the repository at this point in the history
Signed-off-by: Govind Kamat <[email protected]>
  • Loading branch information
gkamat authored Jul 2, 2024
1 parent 8de0082 commit f3af656
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 34 deletions.
17 changes: 12 additions & 5 deletions osbenchmark/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@
import gzip
import logging
import os
import mmap
import shutil
import subprocess
import tarfile
import zipfile
import urllib.error
from contextlib import suppress

import mmap


import zstandard as zstd

from osbenchmark import exceptions
from osbenchmark.utils import console


Expand Down Expand Up @@ -560,7 +559,7 @@ def remove(data_file_path):
os.remove(f"{data_file_path}.offset")


def prepare_file_offset_table(data_file_path):
def prepare_file_offset_table(data_file_path, base_url, source_url, downloader):
"""
Creates a file that contains a mapping from line numbers to file offsets for the provided path. This file is used internally by
#skip_lines(data_file_path, data_file) to speed up line skipping.
Expand All @@ -569,6 +568,14 @@ def prepare_file_offset_table(data_file_path):
:return The number of lines read or ``None`` if it did not have to build the file offset table.
"""
file_offset_table = FileOffsetTable.create_for_data_file(data_file_path)
if not file_offset_table.is_valid():
if not source_url:
try:
downloader.download(base_url, None, data_file_path + '.offset', None)
except exceptions.DataError as e:
if isinstance(e.cause, urllib.error.HTTPError) and (e.cause.code == 403 or e.cause.code == 404):
logging.getLogger(__name__).info("Pre-generated offset file not found, will generate from corpus data")

if not file_offset_table.is_valid():
console.info("Preparing file offset table for [%s] ... " % data_file_path, end="", flush=True)
line_number = 0
Expand Down
16 changes: 5 additions & 11 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def download(self, base_url, source_url, target_path, size_in_bytes):
self.logger.info("Downloading data from [%s] to [%s].", data_url, target_path)

# we want to have a bit more accurate download progress as these files are typically very large
progress = net.Progress("[INFO] Downloading workload data: " + os.path.basename(target_path),
progress = net.Progress("[INFO] Downloading workload data file: " + os.path.basename(target_path),
accuracy=1)
net.download(data_url, target_path, size_in_bytes, progress_indicator=progress)
progress.finish()
Expand Down Expand Up @@ -532,9 +532,9 @@ def is_locally_available(self, file_name):
def has_expected_size(self, file_name, expected_size):
return expected_size is None or os.path.getsize(file_name) == expected_size

def create_file_offset_table(self, document_file_path, expected_number_of_lines):
def create_file_offset_table(self, document_file_path, base_url, source_url, expected_number_of_lines):
# just rebuild the file every time for the time being. Later on, we might check the data file fingerprint to avoid it
lines_read = io.prepare_file_offset_table(document_file_path)
lines_read = io.prepare_file_offset_table(document_file_path, base_url, source_url, self.downloader)
if lines_read and lines_read != expected_number_of_lines:
io.remove_file_offset_table(document_file_path)
raise exceptions.DataError(f"Data in [{document_file_path}] for workload [{self.workload_name}] are invalid. "
Expand Down Expand Up @@ -588,13 +588,7 @@ def prepare_document_set(self, document_set, data_root):
else:
raise
if document_set.support_file_offset_table:
if not document_set.source_url:
try:
self.downloader.download(document_set.base_url, None, doc_path + '.offset', None)
except exceptions.DataError as e:
if isinstance(e.cause, urllib.error.HTTPError) and (e.cause.code == 403 or e.cause.code == 404):
self.logger.info("Pre-generated offset file not found, will generate from corpus data")
self.create_file_offset_table(doc_path, document_set.number_of_lines)
self.create_file_offset_table(doc_path, document_set.base_url, document_set.source_url, document_set.number_of_lines)

def prepare_bundled_document_set(self, document_set, data_root):
"""
Expand All @@ -621,7 +615,7 @@ def prepare_bundled_document_set(self, document_set, data_root):
while True:
if self.is_locally_available(doc_path):
if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes):
self.create_file_offset_table(doc_path, document_set.number_of_lines)
self.create_file_offset_table(doc_path, document_set.base_url, document_set.source_url, document_set.number_of_lines)
return True
else:
raise exceptions.DataError(f"[{doc_path}] is present but does not have the expected size "
Expand Down
53 changes: 35 additions & 18 deletions tests/workload/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ def stop_watch():
return None


class InstanceOf:
"Tests whether an object belongs to a specified class."

def __init__(self, cls):
self.cls = cls

def __eq__(self, other):
return isinstance(other, self.cls)

def __ne__(self, other):
return not isinstance(other, self.cls)

def __repr__(self):
return f"<{self.cls.__name__} object at {hex(id(self))}>"


class SimpleWorkloadRepositoryTests(TestCase):
@mock.patch("os.path.exists")
@mock.patch("os.path.isdir")
Expand Down Expand Up @@ -166,7 +182,7 @@ def test_does_nothing_if_document_file_available(self, is_file, get_size, prepar
uncompressed_size_in_bytes=2000),
data_root="/tmp")

prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", None, None, InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
@mock.patch("os.path.getsize")
Expand All @@ -188,7 +204,7 @@ def test_decompresses_if_archive_available(self, is_file, get_size, prepare_file
uncompressed_size_in_bytes=2000),
data_root="/tmp")

prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", None, None, InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.io.decompress")
@mock.patch("os.path.getsize")
Expand Down Expand Up @@ -286,11 +302,10 @@ def test_download_document_archive_if_no_file_available(self, is_file, get_size,
ensure_dir.assert_called_with("/tmp")
decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp")
calls = [ mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2",
"/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY),
mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.offset",
"/tmp/docs.json.offset", None, progress_indicator=mock.ANY) ]
"/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY) ]
download.assert_has_calls(calls)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test',
None, InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
@mock.patch("osbenchmark.utils.io.decompress")
Expand Down Expand Up @@ -333,7 +348,9 @@ def test_download_document_archive_with_source_url_compressed(self, is_file, get
decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp")
download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2",
"/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora',
'http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2',
InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
@mock.patch("osbenchmark.utils.io.decompress")
Expand Down Expand Up @@ -371,7 +388,9 @@ def test_download_document_with_source_url_uncompressed(self, is_file, get_size,
ensure_dir.assert_called_with("/tmp")
download.assert_called_with(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json",
"/tmp/docs.json", 2000, progress_indicator=mock.ANY)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", f"{scheme}://benchmarks.opensearch.org/corpora/",
f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json",
InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
@mock.patch("osbenchmark.utils.io.decompress")
Expand Down Expand Up @@ -407,11 +426,10 @@ def test_download_document_with_trailing_baseurl_slash(self, is_file, get_size,

ensure_dir.assert_called_with("/tmp")
calls = [ mock.call(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", \
"/tmp/docs.json", 2000, progress_indicator=mock.ANY),
mock.call(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json.offset", \
"/tmp/docs.json.offset", None, progress_indicator=mock.ANY) ]
"/tmp/docs.json", 2000, progress_indicator=mock.ANY) ]
download.assert_has_calls(calls)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/",
None, InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
@mock.patch("osbenchmark.utils.net.download")
Expand Down Expand Up @@ -444,11 +462,10 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en

ensure_dir.assert_called_with("/tmp")
calls = [ mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", \
"/tmp/docs.json", 2000, progress_indicator=mock.ANY),
mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.offset", \
"/tmp/docs.json.offset", None, progress_indicator=mock.ANY) ]
"/tmp/docs.json", 2000, progress_indicator=mock.ANY) ]
download.assert_has_calls(calls)
prepare_file_offset_table.assert_called_with("/tmp/docs.json")
prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test',
None, InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.net.download")
@mock.patch("osbenchmark.utils.io.ensure_dir")
Expand Down Expand Up @@ -606,7 +623,7 @@ def test_prepare_bundled_document_set_if_document_file_available(self, is_file,
uncompressed_size_in_bytes=2000),
data_root="."))

prepare_file_offset_table.assert_called_with("./docs.json")
prepare_file_offset_table.assert_called_with("./docs.json", None, None, InstanceOf(loader.Downloader))

@mock.patch("osbenchmark.utils.io.prepare_file_offset_table")
@mock.patch("osbenchmark.utils.io.decompress")
Expand Down Expand Up @@ -791,7 +808,7 @@ def test_prepare_bundled_document_set_decompresses_compressed_docs(self, is_file
uncompressed_size_in_bytes=2000),
data_root="."))

prepare_file_offset_table.assert_called_with("./docs.json")
prepare_file_offset_table.assert_called_with("./docs.json", None, None, InstanceOf(loader.Downloader))

@mock.patch("os.path.getsize")
@mock.patch("os.path.isfile")
Expand Down

0 comments on commit f3af656

Please sign in to comment.