Skip to content

Commit

Permalink
fix(compression): Fix gzip and zlib performance degradation (#20032)
Browse files Browse the repository at this point in the history
* fix(compression): Fix gzip and zlib performance degradation

Fix gzip and zlib performance degradation caused by:
* flate2 v1.0.28 started to resize its input buffer up to its capacity
  and back to the actual number of bytes written.
* Some sinks are writing to Compressor without buffering,
  resulting in frequent small writes to the flate2 writer.
  Within 32KB of input buffer in flate2, this causes an excessive number of memset operations
  and degraded sink throughput.

This fix introduces a wrapper buffer in front of gzip and zlib writers to accumulate data
before calling the write function of the underlying writer.

Signed-off-by: Artur Malchanau <[email protected]>

* Add a link to the comment with more context.

---------

Signed-off-by: Artur Malchanau <[email protected]>
  • Loading branch information
Hexta authored Mar 8, 2024
1 parent 485dea7 commit d07a435
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 34 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20032_gzip_zlib_performance.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed gzip and zlib compression performance degradation introduced in v0.34.0.

authors: Hexta
97 changes: 63 additions & 34 deletions src/sinks/util/compressor.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::io;
use std::{io, io::BufWriter};

use bytes::{BufMut, BytesMut};
use flate2::write::{GzEncoder, ZlibEncoder};

use super::{snappy::SnappyEncoder, zstd::ZstdEncoder, Compression};

const GZIP_INPUT_BUFFER_CAPACITY: usize = 4_096;
const ZLIB_INPUT_BUFFER_CAPACITY: usize = 4_096;

const OUTPUT_BUFFER_CAPACITY: usize = 1_024;

enum Writer {
Plain(bytes::buf::Writer<BytesMut>),
Gzip(GzEncoder<bytes::buf::Writer<BytesMut>>),
Zlib(ZlibEncoder<bytes::buf::Writer<BytesMut>>),
Gzip(BufWriter<GzEncoder<bytes::buf::Writer<BytesMut>>>),
Zlib(BufWriter<ZlibEncoder<bytes::buf::Writer<BytesMut>>>),
Zstd(ZstdEncoder<bytes::buf::Writer<BytesMut>>),
Snappy(SnappyEncoder<bytes::buf::Writer<BytesMut>>),
}
Expand All @@ -17,21 +22,69 @@ impl Writer {
pub fn get_ref(&self) -> &BytesMut {
match self {
Writer::Plain(inner) => inner.get_ref(),
Writer::Gzip(inner) => inner.get_ref().get_ref(),
Writer::Zlib(inner) => inner.get_ref().get_ref(),
Writer::Gzip(inner) => inner.get_ref().get_ref().get_ref(),
Writer::Zlib(inner) => inner.get_ref().get_ref().get_ref(),
Writer::Zstd(inner) => inner.get_ref().get_ref(),
Writer::Snappy(inner) => inner.get_ref().get_ref(),
}
}

pub fn into_inner(self) -> BytesMut {
match self {
Writer::Plain(writer) => writer,
Writer::Gzip(writer) => writer
.into_inner()
.expect("BufWriter writer should not fail to finish")
.finish()
.expect("gzip writer should not fail to finish"),
Writer::Zlib(writer) => writer
.into_inner()
.expect("BufWriter writer should not fail to finish")
.finish()
.expect("zlib writer should not fail to finish"),
Writer::Zstd(writer) => writer
.finish()
.expect("zstd writer should not fail to finish"),
Writer::Snappy(writer) => writer
.finish()
.expect("snappy writer should not fail to finish"),
}
.into_inner()
}

pub fn finish(self) -> io::Result<BytesMut> {
let buf = match self {
Writer::Plain(writer) => writer,
Writer::Gzip(writer) => writer.into_inner()?.finish()?,
Writer::Zlib(writer) => writer.into_inner()?.finish()?,
Writer::Zstd(writer) => writer.finish()?,
Writer::Snappy(writer) => writer.finish()?,
}
.into_inner();

Ok(buf)
}
}

impl From<Compression> for Writer {
fn from(compression: Compression) -> Self {
let writer = BytesMut::with_capacity(1_024).writer();
let writer = BytesMut::with_capacity(OUTPUT_BUFFER_CAPACITY).writer();
match compression {
Compression::None => Writer::Plain(writer),
Compression::Gzip(level) => Writer::Gzip(GzEncoder::new(writer, level.as_flate2())),
Compression::Zlib(level) => Writer::Zlib(ZlibEncoder::new(writer, level.as_flate2())),
// Buffering writes to the underlying Encoder writer
// to avoid Vec-trashing and expensive memset syscalls.
// https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152
Compression::Gzip(level) => Writer::Gzip(BufWriter::with_capacity(
GZIP_INPUT_BUFFER_CAPACITY,
GzEncoder::new(writer, level.as_flate2()),
)),
// Buffering writes to the underlying Encoder writer
// to avoid Vec-trashing and expensive memset syscalls.
// https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152
Compression::Zlib(level) => Writer::Zlib(BufWriter::with_capacity(
ZLIB_INPUT_BUFFER_CAPACITY,
ZlibEncoder::new(writer, level.as_flate2()),
)),
Compression::Zstd(level) => {
let encoder = ZstdEncoder::new(writer, level.into())
.expect("Zstd encoder should not fail on init.");
Expand Down Expand Up @@ -98,16 +151,7 @@ impl Compressor {
/// If the compressor encounters an I/O error while finalizing the payload, an error
/// variant will be returned.
pub fn finish(self) -> io::Result<BytesMut> {
let buf = match self.inner {
Writer::Plain(writer) => writer,
Writer::Gzip(writer) => writer.finish()?,
Writer::Zlib(writer) => writer.finish()?,
Writer::Zstd(writer) => writer.finish()?,
Writer::Snappy(writer) => writer.finish()?,
}
.into_inner();

Ok(buf)
self.inner.finish()
}

/// Consumes the compressor, returning the internal buffer used by the compressor.
Expand All @@ -120,22 +164,7 @@ impl Compressor {
///
/// Consider using `finish` if catching these scenarios is important.
pub fn into_inner(self) -> BytesMut {
match self.inner {
Writer::Plain(writer) => writer,
Writer::Gzip(writer) => writer
.finish()
.expect("gzip writer should not fail to finish"),
Writer::Zlib(writer) => writer
.finish()
.expect("zlib writer should not fail to finish"),
Writer::Zstd(writer) => writer
.finish()
.expect("zstd writer should not fail to finish"),
Writer::Snappy(writer) => writer
.finish()
.expect("snappy writer should not fail to finish"),
}
.into_inner()
self.inner.into_inner()
}
}

Expand Down

0 comments on commit d07a435

Please sign in to comment.