Skip to content

Commit

Permalink
Hubspot backport: HBASE-27672 Read RPC threads may BLOCKED at the Con…
Browse files Browse the repository at this point in the history
…figuration.get when using java compression (apache#5075) (apache#5084) (apache#76)

Signed-off-by: Bryan Beaudreault <[email protected]>
Co-authored-by: Xiaolin Ha <[email protected]>
  • Loading branch information
charlesconnell and sunhelly authored Feb 7, 2024
1 parent 53f2137 commit 73223be
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";

private Configuration conf;
private int bufferSize;

public Lz4Codec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -59,6 +61,7 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -79,7 +82,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -90,7 +93,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class LzoCodec implements Configurable, CompressionCodec {
public static final String LZO_BUFFER_SIZE_KEY = "hbase.io.compress.lzo.buffersize";

private Configuration conf;
private int bufferSize;

public LzoCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -59,6 +61,7 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -79,7 +82,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -90,7 +93,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";

private Configuration conf;
private int bufferSize;

public SnappyCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -59,6 +61,7 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -79,7 +82,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -90,7 +93,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ public class ZstdCodec implements Configurable, CompressionCodec {
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;

private Configuration conf;
private int bufferSize;

public ZstdCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -87,7 +90,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -98,7 +101,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ public class BrotliCodec implements Configurable, CompressionCodec {
public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024;

private Configuration conf;
private int bufferSize;
private int level;
private int window;

public BrotliCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
level = getLevel(conf);
window = getWindow(conf);
}

@Override
Expand All @@ -60,16 +66,19 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
this.level = getLevel(conf);
this.window = getWindow(conf);
}

@Override
public Compressor createCompressor() {
return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf));
return new BrotliCompressor(level, window, bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new BrotliDecompressor(getBufferSize(conf));
return new BrotliDecompressor(bufferSize);
}

@Override
Expand All @@ -80,7 +89,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -91,7 +100,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";

private Configuration conf;
private int bufferSize;

public Lz4Codec() {
conf = new Configuration();
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -57,16 +59,17 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
public Compressor createCompressor() {
return new Lz4Compressor(getBufferSize(conf));
return new Lz4Compressor(bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new Lz4Decompressor(getBufferSize(conf));
return new Lz4Decompressor(bufferSize);
}

@Override
Expand All @@ -77,7 +80,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -88,7 +91,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";

private Configuration conf;
private int bufferSize;

public SnappyCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -57,16 +59,17 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
public Compressor createCompressor() {
return new SnappyCompressor(getBufferSize(conf));
return new SnappyCompressor(bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new SnappyDecompressor(getBufferSize(conf));
return new SnappyDecompressor(bufferSize);
}

@Override
Expand All @@ -77,7 +80,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -88,7 +91,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ public class LzmaCodec implements Configurable, CompressionCodec {
public static final int LZMA_BUFFERSIZE_DEFAULT = 256 * 1024;

private Configuration conf;
private int bufferSize;
private int level;

public LzmaCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
level = getLevel(conf);
}

@Override
Expand All @@ -57,16 +61,18 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
this.level = getLevel(conf);
}

@Override
public Compressor createCompressor() {
return new LzmaCompressor(getLevel(conf), getBufferSize(conf));
return new LzmaCompressor(level, bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new LzmaDecompressor(getBufferSize(conf));
return new LzmaDecompressor(bufferSize);
}

@Override
Expand All @@ -77,7 +83,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -88,7 +94,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Loading

0 comments on commit 73223be

Please sign in to comment.