Skip to content

Commit

Permalink
Add native Lz4, Snappy, and Zstd
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jul 2, 2024
1 parent 8a0851c commit a73d3e2
Show file tree
Hide file tree
Showing 52 changed files with 1,637 additions and 391 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<version>3.3.5-1</version>
<version>3.3.5-3</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/airlift/compress/hadoop/HadoopStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import java.io.OutputStream;
import java.util.List;

/**
* A factory for creating Hadoop compliant input and output streams.
* Implementations of this interface are thread safe.
*/
public interface HadoopStreams
{
String getDefaultFileExtension();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/airlift/compress/lz4/Lz4Codec.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class Lz4Codec
{
public Lz4Codec()
{
super(configuration -> new Lz4HadoopStreams(getBufferSize(configuration)));
super(configuration -> new Lz4HadoopStreams(true, getBufferSize(configuration)));
}

private static int getBufferSize(Optional<Configuration> configuration)
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/airlift/compress/lz4/Lz4Compressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import io.airlift.compress.Compressor;

import java.lang.foreign.MemorySegment;

public sealed interface Lz4Compressor
extends Compressor
permits Lz4JavaCompressor, Lz4NativeCompressor
{
int compress(MemorySegment input, MemorySegment output);
}
25 changes: 25 additions & 0 deletions src/main/java/io/airlift/compress/lz4/Lz4Decompressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import io.airlift.compress.Decompressor;

import java.lang.foreign.MemorySegment;

public sealed interface Lz4Decompressor
extends Decompressor
permits Lz4JavaDecompressor, Lz4NativeDecompressor
{
int decompress(MemorySegment input, MemorySegment output);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class Lz4HadoopInputStream
extends HadoopInputStream
{
private final Lz4JavaDecompressor decompressor = new Lz4JavaDecompressor();
private final Lz4Decompressor decompressor;
private final InputStream in;
private final byte[] uncompressedChunk;

Expand All @@ -35,8 +35,9 @@ class Lz4HadoopInputStream

private byte[] compressed = new byte[0];

public Lz4HadoopInputStream(InputStream in, int maxUncompressedLength)
public Lz4HadoopInputStream(Lz4Decompressor decompressor, InputStream in, int maxUncompressedLength)
{
this.decompressor = requireNonNull(decompressor, "decompressor is null");
this.in = requireNonNull(in, "in is null");
// over allocate buffer which makes decompression easier
uncompressedChunk = new byte[maxUncompressedLength + SIZE_OF_LONG];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class Lz4HadoopOutputStream
extends HadoopOutputStream
{
private final Lz4JavaCompressor compressor = new Lz4JavaCompressor();
private final Lz4Compressor compressor;

private final OutputStream out;
private final byte[] inputBuffer;
Expand All @@ -33,8 +33,9 @@ class Lz4HadoopOutputStream

private final byte[] outputBuffer;

public Lz4HadoopOutputStream(OutputStream out, int bufferSize)
public Lz4HadoopOutputStream(Lz4Compressor compressor, OutputStream out, int bufferSize)
{
this.compressor = requireNonNull(compressor, "compressor is null");
this.out = requireNonNull(out, "out is null");
inputBuffer = new byte[bufferSize];
// leave extra space free at end of buffers to make compression (slightly) faster
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/airlift/compress/lz4/Lz4HadoopStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ public class Lz4HadoopStreams
implements HadoopStreams
{
private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256 * 1024;
private final boolean useNative;
private final int bufferSize;

public Lz4HadoopStreams()
{
this(DEFAULT_OUTPUT_BUFFER_SIZE);
this(true, DEFAULT_OUTPUT_BUFFER_SIZE);
}

public Lz4HadoopStreams(int bufferSize)
public Lz4HadoopStreams(boolean useNative, int bufferSize)
{
this.useNative = useNative && Lz4Native.isEnabled();
this.bufferSize = bufferSize;
}

Expand All @@ -54,12 +56,14 @@ public List<String> getHadoopCodecName()
@Override
public HadoopInputStream createInputStream(InputStream in)
{
return new Lz4HadoopInputStream(in, bufferSize);
Lz4Decompressor decompressor = useNative ? new Lz4NativeDecompressor() : new Lz4JavaDecompressor();
return new Lz4HadoopInputStream(decompressor, in, bufferSize);
}

@Override
public HadoopOutputStream createOutputStream(OutputStream out)
{
return new Lz4HadoopOutputStream(out, bufferSize);
Lz4Compressor compressor = useNative ? new Lz4NativeCompressor() : new Lz4JavaCompressor();
return new Lz4HadoopOutputStream(compressor, out, bufferSize);
}
}
6 changes: 2 additions & 4 deletions src/main/java/io/airlift/compress/lz4/Lz4JavaCompressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.airlift.compress.lz4;

import io.airlift.compress.Compressor;

import java.lang.foreign.MemorySegment;

import static io.airlift.compress.lz4.Lz4RawCompressor.MAX_TABLE_SIZE;
Expand All @@ -29,8 +27,8 @@
/**
* This class is not thread-safe
*/
public class Lz4JavaCompressor
implements Compressor
public final class Lz4JavaCompressor
implements Lz4Compressor
{
private final int[] table = new int[MAX_TABLE_SIZE];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.airlift.compress.lz4;

import io.airlift.compress.Decompressor;
import io.airlift.compress.MalformedInputException;

import java.lang.foreign.MemorySegment;
Expand All @@ -26,8 +25,8 @@
import static java.util.Objects.requireNonNull;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

public class Lz4JavaDecompressor
implements Decompressor
public final class Lz4JavaDecompressor
implements Lz4Decompressor
{
@Override
public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
Expand Down
146 changes: 146 additions & 0 deletions src/main/java/io/airlift/compress/lz4/Lz4Native.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import io.airlift.compress.internal.NativeLoader.Symbols;
import io.airlift.compress.internal.NativeSignature;

import java.lang.foreign.MemorySegment;
import java.lang.invoke.MethodHandle;
import java.util.Optional;

import static io.airlift.compress.internal.NativeLoader.loadSymbols;
import static java.lang.invoke.MethodHandles.lookup;

final class Lz4Native
{
private Lz4Native() {}

private record MethodHandles(
@NativeSignature(name = "LZ4_compressBound", returnType = int.class, argumentTypes = int.class)
MethodHandle maxCompressedLength,
@NativeSignature(name = "LZ4_compress_fast", returnType = int.class, argumentTypes = {MemorySegment.class, MemorySegment.class, int.class, int.class, int.class})
MethodHandle compress,
@NativeSignature(name = "LZ4_compress_fast_extState", returnType = int.class, argumentTypes = {MemorySegment.class, MemorySegment.class, MemorySegment.class, int.class, int.class, int.class})
MethodHandle compressExternalState,
@NativeSignature(name = "LZ4_decompress_safe", returnType = int.class, argumentTypes = {MemorySegment.class, MemorySegment.class, int.class, int.class})
MethodHandle decompress,
@NativeSignature(name = "LZ4_sizeofState", returnType = int.class, argumentTypes = {})
MethodHandle sizeofState) {}

private static final Optional<LinkageError> LINKAGE_ERROR;
private static final MethodHandle MAX_COMPRESSED_LENGTH_METHOD;
private static final MethodHandle COMPRESS_METHOD;
private static final MethodHandle COMPRESS_EXTERNAL_STATE_METHOD;
private static final MethodHandle DECOMPRESS_METHOD;

// Defined in lz4.h: https://github.com/lz4/lz4/blob/v1.9.4/lib/lz4.c#L51
public static final int DEFAULT_ACCELERATION = 1;
public static final int STATE_SIZE;

static {
Symbols<MethodHandles> symbols = loadSymbols("lz4", MethodHandles.class, lookup());
LINKAGE_ERROR = symbols.linkageError();
MethodHandles methodHandles = symbols.symbols();
MAX_COMPRESSED_LENGTH_METHOD = methodHandles.maxCompressedLength();
COMPRESS_METHOD = methodHandles.compress();
COMPRESS_EXTERNAL_STATE_METHOD = methodHandles.compressExternalState();
DECOMPRESS_METHOD = methodHandles.decompress();

if (LINKAGE_ERROR.isEmpty()) {
try {
STATE_SIZE = (int) methodHandles.sizeofState().invokeExact();
}
catch (Throwable e) {
throw new ExceptionInInitializerError(e);
}
}
else {
STATE_SIZE = -1;
}
}

public static boolean isEnabled()
{
return LINKAGE_ERROR.isEmpty();
}

public static void verifyEnabled()
{
if (LINKAGE_ERROR.isPresent()) {
throw new IllegalStateException("Lz4 native library is not enabled", LINKAGE_ERROR.get());
}
}

public static int maxCompressedLength(int inputLength)
{
try {
return (int) MAX_COMPRESSED_LENGTH_METHOD.invokeExact(inputLength);
}
catch (Throwable e) {
throw new AssertionError("should not reach here", e);
}
}

public static int compress(MemorySegment input, int inputLength, MemorySegment compressed, int compressedLength, int acceleration)
{
int result;
try {
result = (int) COMPRESS_METHOD.invokeExact(input, compressed, inputLength, compressedLength, acceleration);
}
catch (Throwable e) {
throw new AssertionError("should not reach here", e);
}

// LZ4_compress_default returns 0 on error, but disallow negative values also
if (result <= 0) {
throw new IllegalArgumentException("Unknown error occurred during compression: result=" + result);
}
return result;
}

public static int compress(MemorySegment input, int inputLength, MemorySegment compressed, int compressedLength, int acceleration, MemorySegment state)
{
int result;
try {
result = (int) COMPRESS_EXTERNAL_STATE_METHOD.invokeExact(state, input, compressed, inputLength, compressedLength, acceleration);
}
catch (Throwable e) {
throw new AssertionError("should not reach here", e);
}

// LZ4_compress_default returns 0 on error, but disallow negative values also
if (result <= 0) {
throw new IllegalArgumentException("Unknown error occurred during compression: result=" + result);
}
return result;
}

public static int decompress(MemorySegment compressed, int compressedLength, MemorySegment output, int outputLength)
{
int result;
try {
result = (int) DECOMPRESS_METHOD.invokeExact(compressed, output, compressedLength, outputLength);
}
catch (Throwable e) {
throw new AssertionError("should not reach here", e);
}

// negative return values indicate errors
if (result < 0) {
throw new IllegalArgumentException("Unknown error occurred during decompression: result=" + result);
}
return result;
}
}
Loading

0 comments on commit a73d3e2

Please sign in to comment.