Skip to content

Commit

Permalink
PARQUET-1973: Support ZSTD JNI BufferPool (#865)
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjoon-hyun authored Feb 9, 2021
1 parent 2b73ce3 commit 279255d
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 7 deletions.
6 changes: 6 additions & 0 deletions parquet-hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ ParquetInputFormat to materialize records. It should be a the descendant class o

## Class: ZstandardCodec

**Property:** `parquet.compression.codec.zstd.bufferPool.enabled`
**Description:** If it is true, [RecyclingBufferPool](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/RecyclingBufferPool.java) is used.
**Default value:** `false`

---

**Property:** `parquet.compression.codec.zstd.level`
**Description:** The compression level of ZSTD. The valid range is 1~22. Generally the higher compression level, the higher compression ratio can be achieved, but the writing time will be longer.
**Default value:** `3`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.parquet.hadoop.codec;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.NoPool;
import com.github.luben.zstd.RecyclingBufferPool;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand All @@ -43,6 +46,8 @@
*/
public class ZstandardCodec implements Configurable, CompressionCodec {

public final static String PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED = "parquet.compression.codec.zstd.bufferPool.enabled";
public final static boolean DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED = false;
public final static String PARQUET_COMPRESS_ZSTD_LEVEL = "parquet.compression.codec.zstd.level";
public final static int DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL = 3;
public final static String PARQUET_COMPRESS_ZSTD_WORKERS = "parquet.compression.codec.zstd.workers";
Expand Down Expand Up @@ -80,7 +85,13 @@ public CompressionInputStream createInputStream(InputStream stream, Decompressor

@Override
public CompressionInputStream createInputStream(InputStream stream) throws IOException {
return new ZstdDecompressorStream(stream);
BufferPool pool;
if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) {
pool = RecyclingBufferPool.INSTANCE;
} else {
pool = NoPool.INSTANCE;
}
return new ZstdDecompressorStream(stream, pool);
}

@Override
Expand All @@ -91,7 +102,14 @@ public CompressionOutputStream createOutputStream(OutputStream stream, Compresso

@Override
public CompressionOutputStream createOutputStream(OutputStream stream) throws IOException {
return new ZstdCompressorStream(stream, conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
BufferPool pool;
if (conf.getBoolean(PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, DEFAULT_PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED)) {
pool = RecyclingBufferPool.INSTANCE;
} else {
pool = NoPool.INSTANCE;
}
return new ZstdCompressorStream(stream, pool,
conf.getInt(PARQUET_COMPRESS_ZSTD_LEVEL, DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL),
conf.getInt(PARQUET_COMPRESS_ZSTD_WORKERS, DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.codec;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.ZstdOutputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;

Expand All @@ -34,6 +35,13 @@ public ZstdCompressorStream(OutputStream stream, int level, int workers) throws
zstdOutputStream.setWorkers(workers);
}

public ZstdCompressorStream(OutputStream stream, BufferPool pool, int level, int workers) throws IOException {
super(stream);
zstdOutputStream = new ZstdOutputStream(stream, pool);
zstdOutputStream.setLevel(level);
zstdOutputStream.setWorkers(workers);
}

public void write(byte[] b, int off, int len) throws IOException {
zstdOutputStream.write(b, off, len);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.codec;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.ZstdInputStream;
import org.apache.hadoop.io.compress.CompressionInputStream;

Expand All @@ -33,6 +34,11 @@ public ZstdDecompressorStream(InputStream stream) throws IOException {
zstdInputStream = new ZstdInputStream(stream);
}

public ZstdDecompressorStream(InputStream stream, BufferPool pool) throws IOException {
super(stream);
zstdInputStream = new ZstdInputStream(stream, pool);
}

public int read(byte[] b, int off, int len) throws IOException {
return zstdInputStream.read(b, off, len);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.parquet.hadoop;

import com.github.luben.zstd.BufferPool;
import com.github.luben.zstd.NoPool;
import com.github.luben.zstd.RecyclingBufferPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -56,14 +59,18 @@ public class TestZstandardCodec {
public void testZstdCodec() throws IOException {
ZstandardCodec codec = new ZstandardCodec();
Configuration conf = new Configuration();
boolean[] pools = {false, true};
int[] levels = {1, 4, 7, 10, 13, 16, 19, 22};
int[] dataSizes = {0, 1, 10, 1024, 1024 * 1024};

for (int i = 0; i < levels.length; i++) {
conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]);
codec.setConf(conf);
for (int j = 0; j < dataSizes.length; j++) {
testZstd(codec, dataSizes[j]);
for (boolean pool: pools) {
for (int i = 0; i < levels.length; i++) {
conf.setBoolean(ZstandardCodec.PARQUET_COMPRESS_ZSTD_BUFFERPOOL_ENABLED, pool);
conf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, levels[i]);
codec.setConf(conf);
for (int j = 0; j < dataSizes.length; j++) {
testZstd(codec, dataSizes[j]);
}
}
}
}
Expand Down

0 comments on commit 279255d

Please sign in to comment.