Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pbzip2/pigz to decompress corpora if available #947

Merged
merged 6 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/Dockerfiles/Dockerfile-dev
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ARG RALLY_LICENSE
ENV RALLY_RUNNING_IN_DOCKER True

RUN apt-get -y update && \
apt-get install -y curl git && \
apt-get install -y curl git pbzip2 pigz && \
apt-get -y upgrade && \
rm -rf /var/lib/apt/lists/*

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfiles/Dockerfile-release
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ARG RALLY_LICENSE
ENV RALLY_RUNNING_IN_DOCKER True

RUN apt-get -y update && \
apt-get install -y curl git gcc && \
apt-get install -y curl git gcc pbzip2 pigz && \
apt-get -y upgrade && \
rm -rf /var/lib/apt/lists/*

Expand Down
29 changes: 29 additions & 0 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@ In all other cases, Rally requires ``git 1.9`` or better. Verify with ``git --ve

``git`` is already installed on MacOS.

pbzip2
~~~~~~

It is strongly recommended to install ``pbzip2`` to speed up decompressing the corpora of Rally `standard tracks <https://github.com/elastic/rally-tracks>`_.
If you have created :doc:`custom tracks </adding_tracks>` using corpora compressed with ``gzip`` instead of ``bzip2``, it's also advisable to install ``pigz`` to speed up the process.

**Debian / Ubuntu**

::

sudo apt-get install pbzip2

**Red Hat / CentOS / Amazon Linux**

``pbzip`` is available via the `EPEL repository <https://fedoraproject.org/wiki/EPEL#Quickstart>`_.

::

sudo yum install pbzip2

**MacOS**

Install via `Homebrew <https://brew.sh/>`_:

::

brew install pbzip2


JDK
~~~

Expand Down
53 changes: 47 additions & 6 deletions esrally/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import bz2
import gzip
import logging
import os
import re
import shutil
import subprocess
import tarfile
import zipfile
Expand Down Expand Up @@ -244,6 +246,15 @@ def is_archive(name):
return ext in [".zip", ".bz2", ".gz", ".tar", ".tar.gz", ".tgz", ".tar.bz2"]


def is_executable(name):
"""
:param name: File name to check.
:return: True iff given file name is executable and in PATH, all other cases False.
"""

return shutil.which(name) is not None


def compress(source_directory, archive_name):
"""
Compress a directory tree.
Expand Down Expand Up @@ -274,24 +285,54 @@ def decompress(zip_name, target_directory):
this function.
"""
path_without_extension, extension = splitext(zip_name)
filename = basename(path_without_extension)
if extension == ".zip":
_do_decompress(target_directory, zipfile.ZipFile(zip_name))
elif extension == ".bz2":
_do_decompress_manually(target_directory, filename, bz2.open(zip_name))
decompressor_args = ["pbzip2", "-d", "-k", "-m10000", "-c"]
dliappis marked this conversation as resolved.
Show resolved Hide resolved
decompressor_lib = bz2.open
_do_decompress_manually(target_directory, zip_name, decompressor_args, decompressor_lib)
elif extension == ".gz":
_do_decompress_manually(target_directory, filename, gzip.open(zip_name))
decompressor_args = ["pigz", "-d", "-k", "-c"]
decompressor_lib = gzip.open
_do_decompress_manually(target_directory, zip_name, decompressor_args, decompressor_lib)
elif extension in [".tar", ".tar.gz", ".tgz", ".tar.bz2"]:
_do_decompress(target_directory, tarfile.open(zip_name))
else:
raise RuntimeError("Unsupported file extension [%s]. Cannot decompress [%s]" % (extension, zip_name))


def _do_decompress_manually(target_directory, filename, compressed_file):
def _do_decompress_manually(target_directory, filename, decompressor_args, decompressor_lib):
decompressor_bin = decompressor_args[0]
base_path_without_extension = basename(splitext(filename)[0])

if is_executable(decompressor_bin):
if _do_decompress_manually_external(target_directory, filename, base_path_without_extension, decompressor_args):
return
else:
logging.getLogger(__name__).warning("%s not found in PATH. Using standard library, decompression will take longer.",
decompressor_bin)

_do_decompress_manually_with_lib(target_directory, filename, decompressor_lib(filename))


def _do_decompress_manually_external(target_directory, filename, base_path_without_extension, decompressor_args):
with open(os.path.join(target_directory, base_path_without_extension), "wb") as new_file:
try:
subprocess.run(decompressor_args + [filename], stdout=new_file, stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as err:
logging.getLogger(__name__).warning("Failed to decompress [%s] with [%s]. Error [%s]. Falling back to standard library.",
filename, err.cmd, err.stderr)
return False
return True


def _do_decompress_manually_with_lib(target_directory, filename, compressed_file):
path_without_extension = basename(splitext(filename)[0])

ensure_dir(target_directory)
try:
with open("%s/%s" % (target_directory, filename), 'wb') as new_file:
for data in iter(lambda: compressed_file.read(100 * 1024), b''):
with open(os.path.join(target_directory, path_without_extension), "wb") as new_file:
for data in iter(lambda: compressed_file.read(100 * 1024), b""):
new_file.write(data)
finally:
compressed_file.close()
Expand Down
54 changes: 46 additions & 8 deletions tests/utils/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import os
import subprocess
import tempfile
import unittest.mock as mock
from unittest import TestCase
Expand Down Expand Up @@ -93,19 +94,56 @@ def test_has_extension(self):
self.assertFalse(io.has_extension("/tmp/README", "README"))


class DecompressionTests(TestCase):
class TestDecompression:
def test_decompresses_supported_file_formats(self):
for ext in ["zip", "gz", "bz2", "tgz", "tar.bz2", "tar.gz"]:
tmp_dir = tempfile.mkdtemp()
archive_path = "%s/resources/test.txt.%s" % (os.path.dirname(os.path.abspath(__file__)), ext)
decompressed_path = "%s/test.txt" % tmp_dir
archive_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", f"test.txt.{ext}")
decompressed_path = os.path.join(tmp_dir, "test.txt")

io.decompress(archive_path, target_directory=tmp_dir)

self.assertTrue(os.path.exists(decompressed_path), msg="Could not decompress [%s] to [%s] (target file does not exist)" %
(archive_path, decompressed_path))
self.assertEqual("Sample text for DecompressionTests\n", self.read(decompressed_path),
msg="Could not decompress [%s] to [%s] (target file is corrupt)" % (archive_path, decompressed_path))
assert os.path.exists(decompressed_path) is True,\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file does not exist)"
assert self.read(decompressed_path) == "Sample text for DecompressionTests\n",\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file is corrupt)"

@mock.patch.object(io, "is_executable", return_value=False)
def test_decompresses_supported_file_formats_with_lib_as_failover(self, mocked_is_executable):
for ext in ["zip", "gz", "bz2", "tgz", "tar.bz2", "tar.gz"]:
tmp_dir = tempfile.mkdtemp()
archive_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", f"test.txt.{ext}")
decompressed_path = os.path.join(tmp_dir, "test.txt")

logger = logging.getLogger("esrally.utils.io")
with mock.patch.object(logger, "warning") as mocked_console_warn:
io.decompress(archive_path, target_directory=tmp_dir)

assert os.path.exists(decompressed_path) is True,\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file does not exist)"
assert self.read(decompressed_path) == "Sample text for DecompressionTests\n",\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file is corrupt)"

if ext in ["bz2", "gz"]:
assert f"not found in PATH. Using standard library, decompression will take longer." in mocked_console_warn.call_args[0][0]

@mock.patch("subprocess.run")
def test_decompress_manually_external_fails_if_tool_missing(self, mocked_run):
base_path_without_extension = "corpus"
archive_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", "test.txt.bz2")
tmp_dir = tempfile.mkdtemp()
decompressor_bin = "pbzip2"
decompress_cmd = f"{decompressor_bin} -d -k -m10000 -c ${archive_path}"
stderr_msg = "Error details here"
expected_err = "Failed to decompress [%s] with [%s]. Error [%s]. Falling back to standard library."
mocked_run.side_effect = subprocess.CalledProcessError(cmd=decompress_cmd, returncode=1, stderr=stderr_msg)

logger = logging.getLogger("esrally.utils.io")
with mock.patch.object(logger, "warning") as mocked_warn_logger:
result = io._do_decompress_manually_external(tmp_dir, archive_path, base_path_without_extension, [decompressor_bin])

mocked_warn_logger.assert_called_once_with(expected_err, archive_path, decompress_cmd, stderr_msg)
assert result is False

def read(self, f):
with open(f, 'r') as content_file:
Expand Down