Skip to content

Commit

Permalink
Use pbzip2/pigz to decompress corpora if available (#947)
Browse files Browse the repository at this point in the history
Decomporessing large corpora using the standard bzip2/gzip libraries
can be a slow process as they only utilize one cpu core.  Take
advantage of pbzip2/pigz, if available, to speed up the process by
taking advantage of all cores.
  • Loading branch information
dliappis authored Apr 6, 2020
1 parent ee06361 commit d7c3575
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfiles/Dockerfile-dev
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ARG RALLY_LICENSE
ENV RALLY_RUNNING_IN_DOCKER True

RUN apt-get -y update && \
apt-get install -y curl git && \
apt-get install -y curl git pbzip2 pigz && \
apt-get -y upgrade && \
rm -rf /var/lib/apt/lists/*

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfiles/Dockerfile-release
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ARG RALLY_LICENSE
ENV RALLY_RUNNING_IN_DOCKER True

RUN apt-get -y update && \
apt-get install -y curl git gcc && \
apt-get install -y curl git gcc pbzip2 pigz && \
apt-get -y upgrade && \
rm -rf /var/lib/apt/lists/*

Expand Down
29 changes: 29 additions & 0 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@ In all other cases, Rally requires ``git 1.9`` or better. Verify with ``git --ve

``git`` is already installed on MacOS.

pbzip2
~~~~~~

It is strongly recommended to install ``pbzip2`` to speed up decompressing the corpora of Rally `standard tracks <https://github.com/elastic/rally-tracks>`_.
If you have created :doc:`custom tracks </adding_tracks>` using corpora compressed with ``gzip`` instead of ``bzip2``, it's also advisable to install ``pigz`` to speed up the process.

**Debian / Ubuntu**

::

sudo apt-get install pbzip2

**Red Hat / CentOS / Amazon Linux**

``pbzip`` is available via the `EPEL repository <https://fedoraproject.org/wiki/EPEL#Quickstart>`_.

::

sudo yum install pbzip2

**MacOS**

Install via `Homebrew <https://brew.sh/>`_:

::

brew install pbzip2


JDK
~~~

Expand Down
53 changes: 47 additions & 6 deletions esrally/utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import bz2
import gzip
import logging
import os
import re
import shutil
import subprocess
import tarfile
import zipfile
Expand Down Expand Up @@ -244,6 +246,15 @@ def is_archive(name):
return ext in [".zip", ".bz2", ".gz", ".tar", ".tar.gz", ".tgz", ".tar.bz2"]


def is_executable(name):
"""
:param name: File name to check.
:return: True iff given file name is executable and in PATH, all other cases False.
"""

return shutil.which(name) is not None


def compress(source_directory, archive_name):
"""
Compress a directory tree.
Expand Down Expand Up @@ -274,24 +285,54 @@ def decompress(zip_name, target_directory):
this function.
"""
path_without_extension, extension = splitext(zip_name)
filename = basename(path_without_extension)
if extension == ".zip":
_do_decompress(target_directory, zipfile.ZipFile(zip_name))
elif extension == ".bz2":
_do_decompress_manually(target_directory, filename, bz2.open(zip_name))
decompressor_args = ["pbzip2", "-d", "-k", "-m10000", "-c"]
decompressor_lib = bz2.open
_do_decompress_manually(target_directory, zip_name, decompressor_args, decompressor_lib)
elif extension == ".gz":
_do_decompress_manually(target_directory, filename, gzip.open(zip_name))
decompressor_args = ["pigz", "-d", "-k", "-c"]
decompressor_lib = gzip.open
_do_decompress_manually(target_directory, zip_name, decompressor_args, decompressor_lib)
elif extension in [".tar", ".tar.gz", ".tgz", ".tar.bz2"]:
_do_decompress(target_directory, tarfile.open(zip_name))
else:
raise RuntimeError("Unsupported file extension [%s]. Cannot decompress [%s]" % (extension, zip_name))


def _do_decompress_manually(target_directory, filename, compressed_file):
def _do_decompress_manually(target_directory, filename, decompressor_args, decompressor_lib):
decompressor_bin = decompressor_args[0]
base_path_without_extension = basename(splitext(filename)[0])

if is_executable(decompressor_bin):
if _do_decompress_manually_external(target_directory, filename, base_path_without_extension, decompressor_args):
return
else:
logging.getLogger(__name__).warning("%s not found in PATH. Using standard library, decompression will take longer.",
decompressor_bin)

_do_decompress_manually_with_lib(target_directory, filename, decompressor_lib(filename))


def _do_decompress_manually_external(target_directory, filename, base_path_without_extension, decompressor_args):
with open(os.path.join(target_directory, base_path_without_extension), "wb") as new_file:
try:
subprocess.run(decompressor_args + [filename], stdout=new_file, stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as err:
logging.getLogger(__name__).warning("Failed to decompress [%s] with [%s]. Error [%s]. Falling back to standard library.",
filename, err.cmd, err.stderr)
return False
return True


def _do_decompress_manually_with_lib(target_directory, filename, compressed_file):
path_without_extension = basename(splitext(filename)[0])

ensure_dir(target_directory)
try:
with open("%s/%s" % (target_directory, filename), 'wb') as new_file:
for data in iter(lambda: compressed_file.read(100 * 1024), b''):
with open(os.path.join(target_directory, path_without_extension), "wb") as new_file:
for data in iter(lambda: compressed_file.read(100 * 1024), b""):
new_file.write(data)
finally:
compressed_file.close()
Expand Down
54 changes: 46 additions & 8 deletions tests/utils/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import os
import subprocess
import tempfile
import unittest.mock as mock
from unittest import TestCase
Expand Down Expand Up @@ -93,19 +94,56 @@ def test_has_extension(self):
self.assertFalse(io.has_extension("/tmp/README", "README"))


class DecompressionTests(TestCase):
class TestDecompression:
def test_decompresses_supported_file_formats(self):
for ext in ["zip", "gz", "bz2", "tgz", "tar.bz2", "tar.gz"]:
tmp_dir = tempfile.mkdtemp()
archive_path = "%s/resources/test.txt.%s" % (os.path.dirname(os.path.abspath(__file__)), ext)
decompressed_path = "%s/test.txt" % tmp_dir
archive_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", f"test.txt.{ext}")
decompressed_path = os.path.join(tmp_dir, "test.txt")

io.decompress(archive_path, target_directory=tmp_dir)

self.assertTrue(os.path.exists(decompressed_path), msg="Could not decompress [%s] to [%s] (target file does not exist)" %
(archive_path, decompressed_path))
self.assertEqual("Sample text for DecompressionTests\n", self.read(decompressed_path),
msg="Could not decompress [%s] to [%s] (target file is corrupt)" % (archive_path, decompressed_path))
assert os.path.exists(decompressed_path) is True,\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file does not exist)"
assert self.read(decompressed_path) == "Sample text for DecompressionTests\n",\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file is corrupt)"

@mock.patch.object(io, "is_executable", return_value=False)
def test_decompresses_supported_file_formats_with_lib_as_failover(self, mocked_is_executable):
for ext in ["zip", "gz", "bz2", "tgz", "tar.bz2", "tar.gz"]:
tmp_dir = tempfile.mkdtemp()
archive_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", f"test.txt.{ext}")
decompressed_path = os.path.join(tmp_dir, "test.txt")

logger = logging.getLogger("esrally.utils.io")
with mock.patch.object(logger, "warning") as mocked_console_warn:
io.decompress(archive_path, target_directory=tmp_dir)

assert os.path.exists(decompressed_path) is True,\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file does not exist)"
assert self.read(decompressed_path) == "Sample text for DecompressionTests\n",\
f"Could not decompress [{archive_path}] to [{decompressed_path}] (target file is corrupt)"

if ext in ["bz2", "gz"]:
assert f"not found in PATH. Using standard library, decompression will take longer." in mocked_console_warn.call_args[0][0]

@mock.patch("subprocess.run")
def test_decompress_manually_external_fails_if_tool_missing(self, mocked_run):
base_path_without_extension = "corpus"
archive_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resources", "test.txt.bz2")
tmp_dir = tempfile.mkdtemp()
decompressor_bin = "pbzip2"
decompress_cmd = f"{decompressor_bin} -d -k -m10000 -c ${archive_path}"
stderr_msg = "Error details here"
expected_err = "Failed to decompress [%s] with [%s]. Error [%s]. Falling back to standard library."
mocked_run.side_effect = subprocess.CalledProcessError(cmd=decompress_cmd, returncode=1, stderr=stderr_msg)

logger = logging.getLogger("esrally.utils.io")
with mock.patch.object(logger, "warning") as mocked_warn_logger:
result = io._do_decompress_manually_external(tmp_dir, archive_path, base_path_without_extension, [decompressor_bin])

mocked_warn_logger.assert_called_once_with(expected_err, archive_path, decompress_cmd, stderr_msg)
assert result is False

def read(self, f):
with open(f, 'r') as content_file:
Expand Down

0 comments on commit d7c3575

Please sign in to comment.