diff --git a/java/src/main/java/ai/rapids/cudf/CloseableArray.java b/java/src/main/java/ai/rapids/cudf/CloseableArray.java new file mode 100644 index 00000000000..5c75f2378e8 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/CloseableArray.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf; + +/** Utility class that wraps an array of closeable instances and can be closed */ +public class CloseableArray implements AutoCloseable { + private T[] array; + + public static CloseableArray wrap(T[] array) { + return new CloseableArray(array); + } + + CloseableArray(T[] array) { + this.array = array; + } + + public int size() { + return array.length; + } + + public T get(int i) { + return array[i]; + } + + public T set(int i, T obj) { + array[i] = obj; + return obj; + } + + public T[] getArray() { + return array; + } + + public T[] release() { + T[] result = array; + array = null; + return result; + } + + public void closeAt(int i) { + try { + T toClose = array[i]; + array[i] = null; + toClose.close(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + close(null); + } + + public void close(Exception pendingError) { + if (array == null) { + return; + } + T[] toClose = array; + array = null; + RuntimeException error = null; + if (pendingError instanceof RuntimeException) { + error = (RuntimeException) pendingError; + } else if (pendingError != null) { + error = new RuntimeException(pendingError); + } + for (T obj: toClose) { + if (obj != null) { + try { + obj.close(); + } catch (RuntimeException e) { + if (error != null) { + error.addSuppressed(e); + } else { + error = e; + } + } catch (Exception e) { + if (error != null) { + error.addSuppressed(e); + } else { + error = new RuntimeException(e); + } + } + } + } + if (error != null) { + throw error; + } + } +} diff --git a/java/src/main/java/ai/rapids/cudf/Cuda.java b/java/src/main/java/ai/rapids/cudf/Cuda.java index 02e4d32617d..bb2d6dbde7d 100755 --- a/java/src/main/java/ai/rapids/cudf/Cuda.java +++ b/java/src/main/java/ai/rapids/cudf/Cuda.java @@ -15,6 +15,9 @@ */ package ai.rapids.cudf; +import ai.rapids.cudf.NvtxColor; +import ai.rapids.cudf.NvtxRange; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -521,4 +524,27 @@ public static synchronized boolean isEnvCompatibleForTesting() { * Whether per-thread default stream is enabled. */ public static native boolean isPtdsEnabled(); + + /** + * Copy data from multiple device buffer sources to multiple device buffer destinations. + * For each buffer to copy there is a corresponding entry in the destination address, source + * address, and copy size vectors. + * @param destAddrs vector of device destination addresses + * @param srcAddrs vector of device source addresses + * @param copySizes vector of copy sizes + * @param stream CUDA stream to use for the copy + */ + public static void multiBufferCopyAsync(long [] destAddrs, + long [] srcAddrs, + long [] copySizes, + Stream stream) { + // Temporary sub-par stand-in for a multi-buffer copy CUDA kernel + assert(destAddrs.length == srcAddrs.length); + assert(copySizes.length == destAddrs.length); + try (NvtxRange copyRange = new NvtxRange("multiBufferCopyAsync", NvtxColor.CYAN)){ + for (int i = 0; i < destAddrs.length; i++) { + asyncMemcpy(destAddrs[i], srcAddrs[i], copySizes[i], CudaMemcpyKind.DEVICE_TO_DEVICE, stream); + } + } + } } diff --git a/java/src/main/java/ai/rapids/cudf/MemoryCleaner.java b/java/src/main/java/ai/rapids/cudf/MemoryCleaner.java index a936d4830ee..05545807bb6 100644 --- a/java/src/main/java/ai/rapids/cudf/MemoryCleaner.java +++ b/java/src/main/java/ai/rapids/cudf/MemoryCleaner.java @@ -19,8 +19,6 @@ package ai.rapids.cudf; import ai.rapids.cudf.ast.CompiledExpression; -import ai.rapids.cudf.nvcomp.BatchedLZ4Decompressor; -import ai.rapids.cudf.nvcomp.Decompressor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,16 +246,6 @@ static void register(Cuda.Event event, Cleaner cleaner) { all.add(new CleanerWeakReference(event, cleaner, collected, false)); } - public static void register(Decompressor.Metadata metadata, Cleaner cleaner) { - // It is now registered... - all.add(new CleanerWeakReference(metadata, cleaner, collected, false)); - } - - public static void register(BatchedLZ4Decompressor.BatchedMetadata metadata, Cleaner cleaner) { - // It is now registered... - all.add(new CleanerWeakReference(metadata, cleaner, collected, false)); - } - static void register(CuFileDriver driver, Cleaner cleaner) { // It is now registered... all.add(new CleanerWeakReference(driver, cleaner, collected, false)); @@ -324,4 +312,4 @@ public String toString() { + "\n"; } } -} \ No newline at end of file +} diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java index 88b20414b0c..1ab3b97945d 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,199 +17,302 @@ package ai.rapids.cudf.nvcomp; import ai.rapids.cudf.BaseDeviceMemoryBuffer; +import ai.rapids.cudf.CloseableArray; import ai.rapids.cudf.Cuda; import ai.rapids.cudf.DeviceMemoryBuffer; import ai.rapids.cudf.HostMemoryBuffer; +import ai.rapids.cudf.MemoryBuffer; +import ai.rapids.cudf.NvtxColor; +import ai.rapids.cudf.NvtxRange; + +import java.util.Arrays; /** Multi-buffer LZ4 compressor */ public class BatchedLZ4Compressor { - /** Describes a batched compression result */ - public static class BatchedCompressionResult { - private final DeviceMemoryBuffer[] compressedBuffers; - private final long[] compressedSizes; - - BatchedCompressionResult(DeviceMemoryBuffer[] buffers, long[] sizes) { - this.compressedBuffers = buffers; - this.compressedSizes = sizes; - } + static final long MAX_CHUNK_SIZE = 16777216; // in bytes + // each chunk has a 64-bit integer value as metadata containing the compressed size + static final long METADATA_BYTES_PER_CHUNK = 8; - /** - * Get the output compressed buffers corresponding to the input buffers. - * Note that the buffers are likely larger than required to store the compressed data. - */ - public DeviceMemoryBuffer[] getCompressedBuffers() { - return compressedBuffers; - } - - /** Get the corresponding amount of compressed data in each output buffer. */ - public long[] getCompressedSizes() { - return compressedSizes; - } - } + private final long chunkSize; + private final long targetIntermediateBufferSize; + private final long maxOutputChunkSize; /** - * Get the amount of temporary storage space required to compress a batch of buffers. - * @param inputs batch of data buffers to be individually compressed - * @param chunkSize compression chunk size to use - * @return amount in bytes of temporary storage space required to compress the batch + * Construct a batched LZ4 compressor instance + * @param chunkSize maximum amount of uncompressed data to compress as a single chunk. Inputs + * larger than this will be compressed in multiple chunks. + * @param targetIntermediateBufferSize desired maximum size of intermediate device buffers + * used during compression. */ - public static long getTempSize(BaseDeviceMemoryBuffer[] inputs, long chunkSize) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); - } - int numBuffers = inputs.length; - long[] inputAddrs = new long[numBuffers]; - long[] inputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = inputs[i]; - inputAddrs[i] = buffer.getAddress(); - inputSizes[i] = buffer.getLength(); - } - return NvcompJni.batchedLZ4CompressGetTempSize(inputAddrs, inputSizes, chunkSize); + public BatchedLZ4Compressor(long chunkSize, long targetIntermediateBufferSize) { + validateChunkSize(chunkSize); + this.chunkSize = chunkSize; + this.maxOutputChunkSize = NvcompJni.batchedLZ4CompressGetMaxOutputChunkSize(chunkSize); + assert maxOutputChunkSize < Integer.MAX_VALUE; + this.targetIntermediateBufferSize = Math.max(targetIntermediateBufferSize, maxOutputChunkSize); } /** - * Get the amount of output storage space required to compress a batch of buffers. - * @param inputs batch of data buffers to be individually compressed - * @param chunkSize compression chunk size to use - * @param tempBuffer temporary storage space - * @return amount in bytes of output storage space corresponding to each input buffer in the batch + * Compress a batch of buffers with LZ4. The input buffers will be closed. + * @param origInputs buffers to compress + * @param stream CUDA stream to use + * @return compressed buffers corresponding to the input buffers */ - public static long[] getOutputSizes(BaseDeviceMemoryBuffer[] inputs, long chunkSize, - BaseDeviceMemoryBuffer tempBuffer) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); + public DeviceMemoryBuffer[] compress(BaseDeviceMemoryBuffer[] origInputs, Cuda.Stream stream) { + try (CloseableArray inputs = CloseableArray.wrap(origInputs)) { + if (chunkSize <= 0) { + throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); + } + final int numInputs = inputs.size(); + if (numInputs == 0) { + return new DeviceMemoryBuffer[0]; + } + + // Each buffer is broken up into chunkSize chunks for compression. Calculate how many + // chunks are needed for each input buffer. + int[] chunksPerInput = new int[numInputs]; + int numChunks = 0; + for (int i = 0; i < numInputs; i++) { + BaseDeviceMemoryBuffer buffer = inputs.get(i); + int numBufferChunks = getNumChunksInBuffer(buffer); + chunksPerInput[i] = numBufferChunks; + numChunks += numBufferChunks; + } + + // Allocate buffers for each chunk and generate parallel lists of chunk source addresses, + // chunk destination addresses, and sizes. + try (CloseableArray compressedBuffers = + allocCompressedBuffers(numChunks, stream); + DeviceMemoryBuffer compressedChunkSizes = + DeviceMemoryBuffer.allocate(numChunks * 8L, stream)) { + long[] inputChunkAddrs = new long[numChunks]; + long[] inputChunkSizes = new long[numChunks]; + long[] outputChunkAddrs = new long[numChunks]; + buildAddrsAndSizes(inputs, inputChunkAddrs, inputChunkSizes, + compressedBuffers, outputChunkAddrs); + + long[] outputChunkSizes; + final long tempBufferSize = NvcompJni.batchedLZ4CompressGetTempSize(numChunks, chunkSize); + try (DeviceMemoryBuffer addrsAndSizes = + putAddrsAndSizesOnDevice(inputChunkAddrs, inputChunkSizes, outputChunkAddrs, stream); + DeviceMemoryBuffer tempBuffer = DeviceMemoryBuffer.allocate(tempBufferSize, stream)) { + final long devOutputAddrsPtr = addrsAndSizes.getAddress() + numChunks * 8L; + final long devInputSizesPtr = devOutputAddrsPtr + numChunks * 8L; + NvcompJni.batchedLZ4CompressAsync( + addrsAndSizes.getAddress(), + devInputSizesPtr, + chunkSize, + numChunks, + tempBuffer.getAddress(), + tempBufferSize, + devOutputAddrsPtr, + compressedChunkSizes.getAddress(), + stream.getStream()); + } + + // Synchronously copy the resulting compressed sizes per chunk. + outputChunkSizes = getOutputChunkSizes(compressedChunkSizes, stream); + + // inputs are no longer needed at this point, so free them early + inputs.close(); + + // Combine compressed chunks into output buffers corresponding to each original input + return stitchOutput(chunksPerInput, compressedChunkSizes, outputChunkAddrs, + outputChunkSizes, stream); + } } - int numBuffers = inputs.length; - long[] inputAddrs = new long[numBuffers]; - long[] inputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = inputs[i]; - inputAddrs[i] = buffer.getAddress(); - inputSizes[i] = buffer.getLength(); + } + + static void validateChunkSize(long chunkSize) { + if (chunkSize <= 0 || chunkSize > MAX_CHUNK_SIZE) { + throw new IllegalArgumentException("Invalid chunk size: " + chunkSize + " Max chunk size is: " + + MAX_CHUNK_SIZE + " bytes"); } - return NvcompJni.batchedLZ4CompressGetOutputSize(inputAddrs, inputSizes, chunkSize, - tempBuffer.getAddress(), tempBuffer.getLength()); } - /** - * Calculates the minimum size in bytes necessary to store the compressed output sizes - * when performing an asynchronous batch compression. - * @param numBuffers number of buffers in the batch - * @return minimum size of the compressed output sizes buffer needed - */ - public static long getCompressedSizesBufferSize(int numBuffers) { - // Each compressed size value is a 64-bit long - return numBuffers * 8; + private static long ceilingDivide(long x, long y) { + return (x + y - 1) / y; } - /** - * Asynchronously compress a batch of input buffers. The compressed size output buffer must be - * pinned memory for this operation to be truly asynchronous. Note that the caller must - * synchronize on the specified CUDA stream in order to safely examine the compressed output - * sizes! - * @param compressedSizesOutputBuffer host memory where the compressed output size will be stored - * @param inputs buffers to compress - * @param chunkSize type of data within each buffer - * @param tempBuffer compression chunk size to use - * @param outputs output buffers that will contain the compressed results - * @param stream CUDA stream to use - */ - public static void compressAsync(HostMemoryBuffer compressedSizesOutputBuffer, - BaseDeviceMemoryBuffer[] inputs, long chunkSize, - BaseDeviceMemoryBuffer tempBuffer, - BaseDeviceMemoryBuffer[] outputs, Cuda.Stream stream) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); - } - int numBuffers = inputs.length; - if (outputs.length != numBuffers) { - throw new IllegalArgumentException("buffer count mismatch, " + numBuffers + " inputs and " + - outputs.length + " outputs"); - } - if (compressedSizesOutputBuffer.getLength() < getCompressedSizesBufferSize(numBuffers)) { - throw new IllegalArgumentException("compressed output size buffer must be able to hold " + - "at least 8 bytes per buffer, size is only " + compressedSizesOutputBuffer.getLength()); - } + private int getNumChunksInBuffer(MemoryBuffer buffer) { + return (int) ceilingDivide(buffer.getLength(), chunkSize); + } - long[] inputAddrs = new long[numBuffers]; - long[] inputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = inputs[i]; - inputAddrs[i] = buffer.getAddress(); - inputSizes[i] = buffer.getLength(); + private CloseableArray allocCompressedBuffers(long numChunks, + Cuda.Stream stream) { + final long chunksPerBuffer = targetIntermediateBufferSize / maxOutputChunkSize; + final long numBuffers = ceilingDivide(numChunks, chunksPerBuffer); + if (numBuffers > Integer.MAX_VALUE) { + throw new IllegalStateException("Too many chunks"); } - - long[] outputAddrs = new long[numBuffers]; - long[] outputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = outputs[i]; - outputAddrs[i] = buffer.getAddress(); - outputSizes[i] = buffer.getLength(); + try (NvtxRange range = new NvtxRange("allocCompressedBuffers", NvtxColor.YELLOW)) { + CloseableArray buffers = CloseableArray.wrap( + new DeviceMemoryBuffer[(int) numBuffers]); + try { + // allocate all of the max-chunks intermediate compressed buffers + for (int i = 0; i < buffers.size() - 1; ++i) { + buffers.set(i, DeviceMemoryBuffer.allocate(chunksPerBuffer * maxOutputChunkSize, stream)); + } + // allocate the tail intermediate compressed buffer that may be smaller than the others + buffers.set(buffers.size() - 1, DeviceMemoryBuffer.allocate( + (numChunks - chunksPerBuffer * (buffers.size() - 1)) * maxOutputChunkSize, stream)); + return buffers; + } catch (Exception e) { + buffers.close(e); + throw e; + } } - - NvcompJni.batchedLZ4CompressAsync(compressedSizesOutputBuffer.getAddress(), - inputAddrs, inputSizes, chunkSize, tempBuffer.getAddress(), tempBuffer.getLength(), - outputAddrs, outputSizes, stream.getStream()); } - /** - * Compress a batch of buffers with LZ4 - * @param inputs buffers to compress - * @param chunkSize compression chunk size to use - * @param stream CUDA stream to use - * @return compression results containing the corresponding output buffer and compressed data size - * for each input buffer - */ - public static BatchedCompressionResult compress(BaseDeviceMemoryBuffer[] inputs, long chunkSize, - Cuda.Stream stream) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); + // Fill in the inputChunkAddrs, inputChunkSizes, and outputChunkAddrs arrays to point + // into the chunks in the input and output buffers. + private void buildAddrsAndSizes(CloseableArray inputs, + long[] inputChunkAddrs, + long[] inputChunkSizes, + CloseableArray compressedBuffers, + long[] outputChunkAddrs) { + // setup the input addresses and sizes + int chunkIdx = 0; + for (BaseDeviceMemoryBuffer input : inputs.getArray()) { + final int numChunksInBuffer = getNumChunksInBuffer(input); + for (int i = 0; i < numChunksInBuffer; i++) { + inputChunkAddrs[chunkIdx] = input.getAddress() + i * chunkSize; + inputChunkSizes[chunkIdx] = (i != numChunksInBuffer - 1) ? chunkSize + : (input.getLength() - (long) i * chunkSize); + ++chunkIdx; + } } - int numBuffers = inputs.length; - long[] inputAddrs = new long[numBuffers]; - long[] inputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = inputs[i]; - inputAddrs[i] = buffer.getAddress(); - inputSizes[i] = buffer.getLength(); + assert chunkIdx == inputChunkAddrs.length; + assert chunkIdx == inputChunkSizes.length; + + // setup output addresses + chunkIdx = 0; + for (DeviceMemoryBuffer buffer : compressedBuffers.getArray()) { + assert buffer.getLength() % maxOutputChunkSize == 0; + long numChunksInBuffer = buffer.getLength() / maxOutputChunkSize; + long baseAddr = buffer.getAddress(); + for (int i = 0; i < numChunksInBuffer; i++) { + outputChunkAddrs[chunkIdx++] = baseAddr + i * maxOutputChunkSize; + } } + assert chunkIdx == outputChunkAddrs.length; + } - DeviceMemoryBuffer[] outputBuffers = new DeviceMemoryBuffer[numBuffers]; - try { - long tempSize = NvcompJni.batchedLZ4CompressGetTempSize(inputAddrs, inputSizes, chunkSize); - try (DeviceMemoryBuffer tempBuffer = DeviceMemoryBuffer.allocate(tempSize)) { - long[] outputSizes = NvcompJni.batchedLZ4CompressGetOutputSize(inputAddrs, inputSizes, - chunkSize, tempBuffer.getAddress(), tempBuffer.getLength()); - long[] outputAddrs = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - DeviceMemoryBuffer buffer = DeviceMemoryBuffer.allocate(outputSizes[i]); - outputBuffers[i] = buffer; - outputAddrs[i] = buffer.getAddress(); + // Write input addresses, output addresses and sizes contiguously into a DeviceMemoryBuffer. + private DeviceMemoryBuffer putAddrsAndSizesOnDevice(long[] inputAddrs, + long[] inputSizes, + long[] outputAddrs, + Cuda.Stream stream) { + final long totalSize = inputAddrs.length * 8L * 3; // space for input, output, and size arrays + final long outputAddrsOffset = inputAddrs.length * 8L; + final long sizesOffset = outputAddrsOffset + inputAddrs.length * 8L; + try (NvtxRange range = new NvtxRange("putAddrsAndSizesOnDevice", NvtxColor.YELLOW)) { + try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(totalSize); + DeviceMemoryBuffer result = DeviceMemoryBuffer.allocate(totalSize)) { + hostbuf.setLongs(0, inputAddrs, 0, inputAddrs.length); + hostbuf.setLongs(outputAddrsOffset, outputAddrs, 0, outputAddrs.length); + for (int i = 0; i < inputSizes.length; i++) { + hostbuf.setLong(sizesOffset + i * 8L, inputSizes[i]); } + result.copyFromHostBuffer(hostbuf, stream); + result.incRefCount(); + return result; + } + } + } - long compressedSizesBufferSize = getCompressedSizesBufferSize(numBuffers); - try (HostMemoryBuffer compressedSizesBuffer = - HostMemoryBuffer.allocate(compressedSizesBufferSize)) { - NvcompJni.batchedLZ4CompressAsync(compressedSizesBuffer.getAddress(), - inputAddrs, inputSizes, chunkSize, - tempBuffer.getAddress(), tempBuffer.getLength(), - outputAddrs, outputSizes, stream.getStream()); - stream.sync(); - long[] compressedSizes = new long[numBuffers]; - compressedSizesBuffer.getLongs(compressedSizes, 0, 0, numBuffers); - return new BatchedCompressionResult(outputBuffers, compressedSizes); + // Synchronously copy the resulting compressed sizes from device memory to host memory. + private long[] getOutputChunkSizes(BaseDeviceMemoryBuffer devChunkSizes, Cuda.Stream stream) { + try (NvtxRange range = new NvtxRange("getOutputChunkSizes", NvtxColor.YELLOW)) { + try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(devChunkSizes.getLength())) { + hostbuf.copyFromDeviceBuffer(devChunkSizes, stream); + int numChunks = (int) (devChunkSizes.getLength() / 8); + long[] result = new long[numChunks]; + for (int i = 0; i < numChunks; i++) { + long size = hostbuf.getLong(i * 8L); + assert size < Integer.MAX_VALUE : "output size is too big"; + result[i] = size; } + return result; } - } catch (Throwable t) { - for (DeviceMemoryBuffer buffer : outputBuffers) { - if (buffer != null) { - buffer.close(); + } + } + + // Stitch together the individual chunks into the result buffers. + // Each result buffer has metadata at the beginning, followed by compressed chunks. + // This is done by building up parallel lists of source addr, dest addr and size and + // then calling multiBufferCopyAsync() + private DeviceMemoryBuffer[] stitchOutput(int[] chunksPerInput, + DeviceMemoryBuffer compressedChunkSizes, + long[] outputChunkAddrs, + long[] outputChunkSizes, + Cuda.Stream stream) { + try (NvtxRange range = new NvtxRange("stitchOutput", NvtxColor.YELLOW)) { + final int numOutputs = chunksPerInput.length; + final long chunkSizesAddr = compressedChunkSizes.getAddress(); + long[] outputBufferSizes = calcOutputBufferSizes(chunksPerInput, outputChunkSizes); + try (CloseableArray outputs = + CloseableArray.wrap(new DeviceMemoryBuffer[numOutputs])) { + // Each chunk needs to be copied, and each output needs a copy of the + // compressed chunk size vector representing the metadata. + final int totalBuffersToCopy = numOutputs + outputChunkAddrs.length; + long[] destAddrs = new long[totalBuffersToCopy]; + long[] srcAddrs = new long[totalBuffersToCopy]; + long[] sizes = new long[totalBuffersToCopy]; + int copyBufferIdx = 0; + int chunkIdx = 0; + for (int outputIdx = 0; outputIdx < numOutputs; outputIdx++) { + DeviceMemoryBuffer outputBuffer = DeviceMemoryBuffer.allocate(outputBufferSizes[outputIdx]); + final long outputBufferAddr = outputBuffer.getAddress(); + outputs.set(outputIdx, outputBuffer); + final long numChunks = chunksPerInput[outputIdx]; + final long metadataSize = numChunks * METADATA_BYTES_PER_CHUNK; + + // setup a copy of the metadata at the front of the output buffer + srcAddrs[copyBufferIdx] = chunkSizesAddr + chunkIdx * 8; + destAddrs[copyBufferIdx] = outputBufferAddr; + sizes[copyBufferIdx] = metadataSize; + ++copyBufferIdx; + + // setup copies of the compressed chunks for this output buffer + long nextChunkAddr = outputBufferAddr + metadataSize; + for (int i = 0; i < numChunks; ++i) { + srcAddrs[copyBufferIdx] = outputChunkAddrs[chunkIdx]; + destAddrs[copyBufferIdx] = nextChunkAddr; + final long chunkSize = outputChunkSizes[chunkIdx]; + sizes[copyBufferIdx] = chunkSize; + copyBufferIdx++; + chunkIdx++; + nextChunkAddr += chunkSize; + } } + assert copyBufferIdx == totalBuffersToCopy; + assert chunkIdx == outputChunkAddrs.length; + assert chunkIdx == outputChunkSizes.length; + + Cuda.multiBufferCopyAsync(destAddrs, srcAddrs, sizes, stream); + return outputs.release(); } - throw t; } } - + // Calculate the list of sizes for each output buffer (metadata plus size of compressed chunks) + private long[] calcOutputBufferSizes(int[] chunksPerInput, + long[] outputChunkSizes) { + long[] sizes = new long[chunksPerInput.length]; + int chunkIdx = 0; + for (int i = 0; i < sizes.length; i++) { + final int chunksInBuffer = chunksPerInput[i]; + final int chunkEndIdx = chunkIdx + chunksInBuffer; + // metadata stored in front of compressed data + long bufferSize = METADATA_BYTES_PER_CHUNK * chunksInBuffer; + // add in the compressed chunk sizes to get the total size + while (chunkIdx < chunkEndIdx) { + bufferSize += outputChunkSizes[chunkIdx++]; + } + sizes[i] = bufferSize; + } + assert chunkIdx == outputChunkSizes.length; + return sizes; + } } diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java index 61969db4fb4..40ad4d5e9ed 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,214 +16,183 @@ package ai.rapids.cudf.nvcomp; +import ai.rapids.cudf.CloseableArray; import ai.rapids.cudf.Cuda; import ai.rapids.cudf.BaseDeviceMemoryBuffer; import ai.rapids.cudf.DeviceMemoryBuffer; -import ai.rapids.cudf.MemoryCleaner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import ai.rapids.cudf.HostMemoryBuffer; +import ai.rapids.cudf.NvtxColor; +import ai.rapids.cudf.NvtxRange; + +import java.util.Arrays; /** LZ4 decompressor that operates on multiple input buffers in a batch */ public class BatchedLZ4Decompressor { - private static final Logger log = LoggerFactory.getLogger(Decompressor.class); - - /** - * Get the metadata associated with a batch of compressed buffers - * @param inputs compressed buffers that will be decompressed - * @param stream CUDA stream to use - * @return opaque metadata object - */ - public static BatchedMetadata getMetadata(BaseDeviceMemoryBuffer[] inputs, Cuda.Stream stream) { - long[] inputAddrs = new long[inputs.length]; - long[] inputSizes = new long[inputs.length]; - for (int i = 0; i < inputs.length; ++i) { - BaseDeviceMemoryBuffer buffer = inputs[i]; - inputAddrs[i] = buffer.getAddress(); - inputSizes[i] = buffer.getLength(); - } - return new BatchedMetadata(NvcompJni.batchedLZ4DecompressGetMetadata( - inputAddrs, inputSizes, stream.getStream())); - } - - /** - * Get the amount of temporary storage required to decompress a batch of buffers - * @param metadata metadata retrieved from the compressed buffers - * @return amount in bytes of temporary storage space required to decompress the buffer batch - */ - public static long getTempSize(BatchedMetadata metadata) { - return NvcompJni.batchedLZ4DecompressGetTempSize(metadata.getMetadata()); - } - - /** - * Get the amount of ouptut storage required to decopmress a batch of buffers - * @param metadata metadata retrieved from the compressed buffers - * @param numOutputs number of buffers in the batch - * @return amount in bytes of temporary storage space required to decompress the buffer batch - */ - public static long[] getOutputSizes(BatchedMetadata metadata, int numOutputs) { - return NvcompJni.batchedLZ4DecompressGetOutputSize(metadata.getMetadata(), numOutputs); - } - /** * Asynchronously decompress a batch of buffers - * @param inputs buffers to decompress - * @param tempBuffer temporary buffer - * @param metadata metadata retrieved from the compressed buffers - * @param outputs output buffers that will contain the compressed results - * @param stream CUDA stream to use + * @param chunkSize maximum uncompressed block size, must match value used during compression + * @param origInputs buffers to decompress, will be closed by this operation + * @param outputs output buffers that will contain the compressed results, each must be sized + * to the exact decompressed size of the corresponding input + * @param stream CUDA stream to use */ - public static void decompressAsync(BaseDeviceMemoryBuffer[] inputs, - BaseDeviceMemoryBuffer tempBuffer, BatchedMetadata metadata, - BaseDeviceMemoryBuffer[] outputs, Cuda.Stream stream) { - int numBuffers = inputs.length; - if (outputs.length != numBuffers) { - throw new IllegalArgumentException("buffer count mismatch, " + numBuffers + " inputs and " + - outputs.length + " outputs"); - } + public static void decompressAsync(long chunkSize, + BaseDeviceMemoryBuffer[] origInputs, + BaseDeviceMemoryBuffer[] outputs, + Cuda.Stream stream) { + try (CloseableArray inputs = + CloseableArray.wrap(Arrays.copyOf(origInputs, origInputs.length))) { + BatchedLZ4Compressor.validateChunkSize(chunkSize); + if (origInputs.length != outputs.length) { + throw new IllegalArgumentException("number of inputs must match number of outputs"); + } + final int numInputs = inputs.size(); + if (numInputs == 0) { + return; + } - long[] inputAddrs = new long[numBuffers]; - long[] inputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = inputs[i]; - inputAddrs[i] = buffer.getAddress(); - inputSizes[i] = buffer.getLength(); - } + int[] chunksPerInput = new int[numInputs]; + long totalChunks = 0; + for (int i = 0; i < numInputs; i++) { + // use output size to determine number of chunks in the input, as the output buffer + // must be exactly sized to the uncompressed data + BaseDeviceMemoryBuffer buffer = outputs[i]; + int numBufferChunks = getNumChunksInBuffer(chunkSize, buffer); + chunksPerInput[i] = numBufferChunks; + totalChunks += numBufferChunks; + } - long[] outputAddrs = new long[numBuffers]; - long[] outputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = outputs[i]; - outputAddrs[i] = buffer.getAddress(); - outputSizes[i] = buffer.getLength(); + final long tempBufferSize = NvcompJni.batchedLZ4DecompressGetTempSize(totalChunks, chunkSize); + try (DeviceMemoryBuffer devAddrsSizes = + buildAddrsSizesBuffer(chunkSize, totalChunks, inputs.getArray(), chunksPerInput, + outputs, stream); + DeviceMemoryBuffer devTemp = DeviceMemoryBuffer.allocate(tempBufferSize)) { + // buffer containing addresses and sizes contains four vectors of longs in this order: + // - compressed chunk input addresses + // - chunk output buffer addresses + // - compressed chunk sizes + // - uncompressed chunk sizes + final long inputAddrsPtr = devAddrsSizes.getAddress(); + final long outputAddrsPtr = inputAddrsPtr + totalChunks * 8; + final long inputSizesPtr = outputAddrsPtr + totalChunks * 8; + final long outputSizesPtr = inputSizesPtr + totalChunks * 8; + NvcompJni.batchedLZ4DecompressAsync( + inputAddrsPtr, + inputSizesPtr, + outputSizesPtr, + totalChunks, + devTemp.getAddress(), + devTemp.getLength(), + outputAddrsPtr, + stream.getStream()); + } } + } - NvcompJni.batchedLZ4DecompressAsync(inputAddrs, inputSizes, - tempBuffer.getAddress(), tempBuffer.getLength(), metadata.getMetadata(), - outputAddrs, outputSizes, stream.getStream()); + private static int getNumChunksInBuffer(long chunkSize, BaseDeviceMemoryBuffer buffer) { + return (int) ((buffer.getLength() + chunkSize - 1) / chunkSize); } /** - * Asynchronously decompress a batch of buffers - * @param inputs buffers to decompress + * Build a device memory buffer containing four vectors of longs in the following order: + *
    + *
  • compressed chunk input addresses
  • + *
  • uncompressed chunk output addresses
  • + *
  • compressed chunk sizes
  • + *
  • uncompressed chunk sizes
  • + *
+ * Each vector contains as many longs as the number of chunks being decompressed + * @param chunkSize maximum uncompressed size of a chunk + * @param totalChunks total number of chunks to be decompressed + * @param inputs device buffers containing the compressed data + * @param chunksPerInput number of compressed chunks per input buffer + * @param outputs device buffers that will hold the uncompressed output * @param stream CUDA stream to use - * @return output buffers containing compressed data corresponding to the input buffers + * @return device buffer containing address and size vectors */ - public static DeviceMemoryBuffer[] decompressAsync(BaseDeviceMemoryBuffer[] inputs, - Cuda.Stream stream) { - int numBuffers = inputs.length; - long[] inputAddrs = new long[numBuffers]; - long[] inputSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - BaseDeviceMemoryBuffer buffer = inputs[i]; - inputAddrs[i] = buffer.getAddress(); - inputSizes[i] = buffer.getLength(); - } - - long metadata = NvcompJni.batchedLZ4DecompressGetMetadata(inputAddrs, inputSizes, - stream.getStream()); - try { - long[] outputSizes = NvcompJni.batchedLZ4DecompressGetOutputSize(metadata, numBuffers); - long[] outputAddrs = new long[numBuffers]; - DeviceMemoryBuffer[] outputs = new DeviceMemoryBuffer[numBuffers]; - try { - for (int i = 0; i < numBuffers; ++i) { - DeviceMemoryBuffer buffer = DeviceMemoryBuffer.allocate(outputSizes[i]); - outputs[i] = buffer; - outputAddrs[i] = buffer.getAddress(); - } - - long tempSize = NvcompJni.batchedLZ4DecompressGetTempSize(metadata); - try (DeviceMemoryBuffer tempBuffer = DeviceMemoryBuffer.allocate(tempSize)) { - NvcompJni.batchedLZ4DecompressAsync(inputAddrs, inputSizes, - tempBuffer.getAddress(), tempBuffer.getLength(), metadata, - outputAddrs, outputSizes, stream.getStream()); - } - } catch (Throwable t) { - for (DeviceMemoryBuffer buffer : outputs) { - if (buffer != null) { - buffer.close(); + private static DeviceMemoryBuffer buildAddrsSizesBuffer(long chunkSize, + long totalChunks, + BaseDeviceMemoryBuffer[] inputs, + int[] chunksPerInput, + BaseDeviceMemoryBuffer[] outputs, + Cuda.Stream stream) { + final long totalBufferSize = totalChunks * 8L * 4L; + try (NvtxRange range = new NvtxRange("buildAddrSizesBuffer", NvtxColor.YELLOW)) { + try (HostMemoryBuffer metadata = fetchMetadata(totalChunks, inputs, chunksPerInput, stream); + HostMemoryBuffer hostAddrsSizes = HostMemoryBuffer.allocate(totalBufferSize); + DeviceMemoryBuffer devAddrsSizes = DeviceMemoryBuffer.allocate(totalBufferSize)) { + // Build four long vectors in the AddrsSizes buffer: + // - compressed input address (one per chunk) + // - uncompressed output address (one per chunk) + // - compressed input size (one per chunk) + // - uncompressed input size (one per chunk) + final long srcAddrsOffset = 0; + final long destAddrsOffset = srcAddrsOffset + totalChunks * 8L; + final long srcSizesOffset = destAddrsOffset + totalChunks * 8L; + final long destSizesOffset = srcSizesOffset + totalChunks * 8L; + long chunkIdx = 0; + for (int inputIdx = 0; inputIdx < inputs.length; inputIdx++) { + final BaseDeviceMemoryBuffer input = inputs[inputIdx]; + final BaseDeviceMemoryBuffer output = outputs[inputIdx]; + final int numChunksInInput = chunksPerInput[inputIdx]; + long srcAddr = input.getAddress() + + BatchedLZ4Compressor.METADATA_BYTES_PER_CHUNK * numChunksInInput; + long destAddr = output.getAddress(); + final long chunkIdxEnd = chunkIdx + numChunksInInput; + while (chunkIdx < chunkIdxEnd) { + final long srcChunkSize = metadata.getLong(chunkIdx * 8); + final long destChunkSize = (chunkIdx < chunkIdxEnd - 1) ? chunkSize + : output.getAddress() + output.getLength() - destAddr; + hostAddrsSizes.setLong(srcAddrsOffset + chunkIdx * 8, srcAddr); + hostAddrsSizes.setLong(destAddrsOffset + chunkIdx * 8, destAddr); + hostAddrsSizes.setLong(srcSizesOffset + chunkIdx * 8, srcChunkSize); + hostAddrsSizes.setLong(destSizesOffset + chunkIdx * 8, destChunkSize); + srcAddr += srcChunkSize; + destAddr += destChunkSize; + ++chunkIdx; } } - throw t; + devAddrsSizes.copyFromHostBuffer(hostAddrsSizes, stream); + devAddrsSizes.incRefCount(); + return devAddrsSizes; } - - return outputs; - } finally { - NvcompJni.batchedLZ4DecompressDestroyMetadata(metadata); } } - /** Opaque metadata object for batched LZ4 decompression */ - public static class BatchedMetadata implements AutoCloseable { - private final BatchedMetadataCleaner cleaner; - private final long id; - private boolean closed = false; - - BatchedMetadata(long metadata) { - this.cleaner = new BatchedMetadataCleaner(metadata); - this.id = cleaner.id; - MemoryCleaner.register(this, cleaner); - cleaner.addRef(); - } - - long getMetadata() { - return cleaner.metadata; - } - - public boolean isLZ4Metadata() { - return NvcompJni.isLZ4Metadata(getMetadata()); - } - - @Override - public synchronized void close() { - if (!closed) { - cleaner.delRef(); - cleaner.clean(false); - closed = true; - } else { - cleaner.logRefCountDebug("double free " + this); - throw new IllegalStateException("Close called too many times " + this); - } - } - - @Override - public String toString() { - return "LZ4 BATCHED METADATA (ID: " + id + " " + - Long.toHexString(cleaner.metadata) + ")"; - } - - private static class BatchedMetadataCleaner extends MemoryCleaner.Cleaner { - private long metadata; - - BatchedMetadataCleaner(long metadata) { - this.metadata = metadata; - } - - @Override - protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) { - boolean neededCleanup = false; - long address = metadata; - if (metadata != 0) { - try { - NvcompJni.batchedLZ4DecompressDestroyMetadata(metadata); - } finally { - // Always mark the resource as freed even if an exception is thrown. - // We cannot know how far it progressed before the exception, and - // therefore it is unsafe to retry. - metadata = 0; - } - neededCleanup = true; - } - if (neededCleanup && logErrorIfNotClean) { - log.error("LZ4 BATCHED METADATA WAS LEAKED (Address: " + Long.toHexString(address) + ")"); - logRefCountDebug("Leaked event"); + /** + * Fetch the metadata at the front of each input in a single, contiguous host buffer. + * @param totalChunks total number of compressed chunks + * @param inputs buffers containing the compressed data + * @param chunksPerInput number of compressed chunks for the corresponding input + * @param stream CUDA stream to use + * @return host buffer containing all of the metadata + */ + private static HostMemoryBuffer fetchMetadata(long totalChunks, + BaseDeviceMemoryBuffer[] inputs, + int[] chunksPerInput, + Cuda.Stream stream) { + try (NvtxRange range = new NvtxRange("fetchMetadata", NvtxColor.PURPLE)) { + // one long per chunk containing the compressed size + final long totalMetadataSize = totalChunks * BatchedLZ4Compressor.METADATA_BYTES_PER_CHUNK; + // Build corresponding vectors of destination addresses, source addresses and sizes. + long[] destAddrs = new long[inputs.length]; + long[] srcAddrs = new long[inputs.length]; + long[] sizes = new long[inputs.length]; + try (HostMemoryBuffer hostMetadata = HostMemoryBuffer.allocate(totalMetadataSize); + DeviceMemoryBuffer devMetadata = DeviceMemoryBuffer.allocate(totalMetadataSize)) { + long destCopyAddr = devMetadata.getAddress(); + for (int inputIdx = 0; inputIdx < inputs.length; inputIdx++) { + final BaseDeviceMemoryBuffer input = inputs[inputIdx]; + final long copySize = chunksPerInput[inputIdx] * BatchedLZ4Compressor.METADATA_BYTES_PER_CHUNK; + destAddrs[inputIdx] = destCopyAddr; + srcAddrs[inputIdx] = input.getAddress(); + sizes[inputIdx] = copySize; + destCopyAddr += copySize; } - return neededCleanup; - } - - @Override - public boolean isClean() { - return metadata != 0; + Cuda.multiBufferCopyAsync(destAddrs, srcAddrs, sizes, stream); + hostMetadata.copyFromDeviceBuffer(devMetadata, stream); + hostMetadata.incRefCount(); + return hostMetadata; } } } diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/CompressionType.java b/java/src/main/java/ai/rapids/cudf/nvcomp/CompressionType.java index 5a133acbf7c..70f0a021a4d 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/CompressionType.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/CompressionType.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,10 @@ public enum CompressionType { INT(4), UINT(5), LONGLONG(6), - ULONGLONG(7); + ULONGLONG(7), + BITS(0xff); + + private static final CompressionType[] types = CompressionType.values(); final int nativeId; @@ -33,6 +36,17 @@ public enum CompressionType { this.nativeId = nativeId; } + /** Lookup the CompressionType that corresponds to the specified native identifier */ + public static CompressionType fromNativeId(int id) { + for (CompressionType type : types) { + if (type.nativeId == id) { + return type; + } + } + throw new IllegalArgumentException("Unknown compression type ID: " + id); + } + + /** Get the native code identifier for the type */ public final int toNativeId() { return nativeId; } diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/Decompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/Decompressor.java deleted file mode 100644 index 90dabfbcf8e..00000000000 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/Decompressor.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ai.rapids.cudf.nvcomp; - -import ai.rapids.cudf.Cuda; -import ai.rapids.cudf.BaseDeviceMemoryBuffer; -import ai.rapids.cudf.MemoryCleaner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Generic single-buffer decompressor interface */ -public class Decompressor { - private static final Logger log = LoggerFactory.getLogger(Decompressor.class); - - /** - * Get the metadata associated with a compressed buffer - * @param buffer compressed data buffer - * @param stream CUDA stream to use - * @return opaque metadata object - */ - public static Metadata getMetadata(BaseDeviceMemoryBuffer buffer, Cuda.Stream stream) { - long metadata = NvcompJni.decompressGetMetadata(buffer.getAddress(), buffer.getLength(), - stream.getStream()); - return new Metadata(metadata); - } - - /** - * Get the amount of temporary storage space required to decompress a buffer. - * @param metadata metadata retrieved from the compressed data - * @return amount in bytes of temporary storage space required to decompress - */ - public static long getTempSize(Metadata metadata) { - return NvcompJni.decompressGetTempSize(metadata.getMetadata()); - } - - /** - * Get the amount of output storage space required to hold the uncompressed data. - * @param metadata metadata retrieved from the compressed data - * @return amount in bytes of output storage space required to decompress - */ - public static long getOutputSize(Metadata metadata) { - return NvcompJni.decompressGetOutputSize(metadata.getMetadata()); - } - - /** - * Asynchronously decompress a buffer. - * @param input compressed data buffer - * @param tempBuffer temporary storage buffer - * @param metadata metadata retrieved from compressed data - * @param output output storage buffer - * @param stream CUDA stream to use - */ - public static void decompressAsync(BaseDeviceMemoryBuffer input, BaseDeviceMemoryBuffer tempBuffer, - Metadata metadata, BaseDeviceMemoryBuffer output, Cuda.Stream stream) { - NvcompJni.decompressAsync( - input.getAddress(), input.getLength(), - tempBuffer.getAddress(), tempBuffer.getLength(), - metadata.getMetadata(), - output.getAddress(), output.getLength(), - stream.getStream()); - } - - /** - * Determine if a buffer is data compressed with LZ4. - * @param buffer data to examine - * @return true if the data is LZ4 compressed - */ - public static boolean isLZ4Data(BaseDeviceMemoryBuffer buffer) { - return NvcompJni.isLZ4Data(buffer.getAddress(), buffer.getLength()); - } - - - /** Opaque metadata object for single-buffer decompression */ - public static class Metadata implements AutoCloseable { - private final MetadataCleaner cleaner; - private final long id; - private boolean closed = false; - - Metadata(long metadata) { - this.cleaner = new MetadataCleaner(metadata); - this.id = cleaner.id; - MemoryCleaner.register(this, cleaner); - cleaner.addRef(); - } - - long getMetadata() { - return cleaner.metadata; - } - - /** - * Determine if this metadata is associated with LZ4-compressed data - * @return true if the metadata is associated with LZ4-compressed data - */ - public boolean isLZ4Metadata() { - return NvcompJni.isLZ4Metadata(getMetadata()); - } - - @Override - public synchronized void close() { - if (!closed) { - cleaner.delRef(); - cleaner.clean(false); - closed = true; - } else { - cleaner.logRefCountDebug("double free " + this); - throw new IllegalStateException("Close called too many times " + this); - } - } - - @Override - public String toString() { - return "DECOMPRESSOR METADATA (ID: " + id + " " + - Long.toHexString(cleaner.metadata) + ")"; - } - - private static class MetadataCleaner extends MemoryCleaner.Cleaner { - private long metadata; - - MetadataCleaner(long metadata) { - this.metadata = metadata; - } - - @Override - protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) { - boolean neededCleanup = false; - long address = metadata; - if (metadata != 0) { - try { - NvcompJni.decompressDestroyMetadata(metadata); - } finally { - // Always mark the resource as freed even if an exception is thrown. - // We cannot know how far it progressed before the exception, and - // therefore it is unsafe to retry. - metadata = 0; - } - neededCleanup = true; - } - if (neededCleanup && logErrorIfNotClean) { - log.error("DECOMPRESSOR METADATA WAS LEAKED (Address: " + - Long.toHexString(address) + ")"); - logRefCountDebug("Leaked event"); - } - return neededCleanup; - } - - @Override - public boolean isClean() { - return metadata != 0; - } - } - } -} diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/LZ4Compressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/LZ4Compressor.java index ce7012a3bee..67a770f1346 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/LZ4Compressor.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/LZ4Compressor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,46 +18,54 @@ import ai.rapids.cudf.Cuda; import ai.rapids.cudf.BaseDeviceMemoryBuffer; +import ai.rapids.cudf.DeviceMemoryBuffer; import ai.rapids.cudf.HostMemoryBuffer; /** Single-buffer compressor implementing LZ4 */ public class LZ4Compressor { - /** - * Calculate the amount of temporary storage space required to compress a buffer. - * @param input buffer to compress - * @param inputType type of data within the buffer - * @param chunkSize compression chunk size to use - * @return amount in bytes of temporary storage space required to compress the buffer - */ - public static long getTempSize(BaseDeviceMemoryBuffer input, CompressionType inputType, - long chunkSize) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); + /** LZ4 compression settings corresponding to a chunk size */ + public static final class Configuration { + private final long metadataBytes; + private final long tempBytes; + private final long maxCompressedBytes; + + Configuration(long metadataBytes, long tempBytes, long maxCompressedBytes) { + this.metadataBytes = metadataBytes; + this.tempBytes = tempBytes; + this.maxCompressedBytes = maxCompressedBytes; + } + + /** Get the size of the metadata information in bytes */ + public long getMetadataBytes() { + return metadataBytes; + } + + /** Get the size of the temporary storage in bytes needed to compress */ + public long getTempBytes() { + return tempBytes; + } + + /** Get the maximum compressed output size in bytes */ + public long getMaxCompressedBytes() { + return maxCompressedBytes; } - return NvcompJni.lz4CompressGetTempSize(input.getAddress(), input.getLength(), - inputType.nativeId, chunkSize); } /** - * Calculate the amount of output storage space required to compress a buffer. - * @param input buffer to compress - * @param inputType type of data within the buffer - * @param chunkSize compression chunk size to use - * @param tempBuffer temporary storage space - * @return amount in bytes of output storage space required to compress the buffer + * Get the compression configuration necessary for a particular chunk size. + * @param chunkSize size of an LZ4 chunk in bytes + * @param uncompressedSize total size of the uncompressed data + * @return compression configuration for the specified chunk size */ - public static long getOutputSize(BaseDeviceMemoryBuffer input, CompressionType inputType, - long chunkSize, BaseDeviceMemoryBuffer tempBuffer) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); - } - return NvcompJni.lz4CompressGetOutputSize(input.getAddress(), input.getLength(), - inputType.nativeId, chunkSize, tempBuffer.getAddress(), tempBuffer.getLength(), false); + public static Configuration configure(long chunkSize, long uncompressedSize) { + long[] configs = NvcompJni.lz4CompressConfigure(chunkSize, uncompressedSize); + assert configs.length == 3; + return new Configuration(configs[0], configs[1], configs[2]); } /** - * Compress a buffer with LZ4. + * Synchronously compress a buffer with LZ4. * @param input buffer to compress * @param inputType type of data within the buffer * @param chunkSize compression chunk size to use @@ -72,16 +80,19 @@ public static long compress(BaseDeviceMemoryBuffer input, CompressionType inputT if (chunkSize <= 0) { throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); } - return NvcompJni.lz4Compress(input.getAddress(), input.getLength(), inputType.nativeId, - chunkSize, tempBuffer.getAddress(), tempBuffer.getLength(), - output.getAddress(), output.getLength(), stream.getStream()); + try (DeviceMemoryBuffer devOutputSizeBuffer = DeviceMemoryBuffer.allocate(Long.BYTES); + HostMemoryBuffer hostOutputSizeBuffer = HostMemoryBuffer.allocate(Long.BYTES)) { + compressAsync(devOutputSizeBuffer, input, inputType, chunkSize, tempBuffer, output, stream); + hostOutputSizeBuffer.copyFromDeviceBuffer(devOutputSizeBuffer, stream); + return hostOutputSizeBuffer.getLong(0); + } } /** * Asynchronously compress a buffer with LZ4. The compressed size output buffer must be pinned * memory for this operation to be truly asynchronous. Note that the caller must synchronize * on the specified CUDA stream in order to safely examine the compressed output size! - * @param compressedSizeOutputBuffer host memory where the compressed output size will be stored + * @param compressedSizeOutputBuffer device memory where the compressed output size will be stored * @param input buffer to compress * @param inputType type of data within the buffer * @param chunkSize compression chunk size to use @@ -89,7 +100,7 @@ public static long compress(BaseDeviceMemoryBuffer input, CompressionType inputT * @param output buffer that will contain the compressed result * @param stream CUDA stream to use */ - public static void compressAsync(HostMemoryBuffer compressedSizeOutputBuffer, + public static void compressAsync(DeviceMemoryBuffer compressedSizeOutputBuffer, BaseDeviceMemoryBuffer input, CompressionType inputType, long chunkSize, BaseDeviceMemoryBuffer tempBuffer, BaseDeviceMemoryBuffer output, Cuda.Stream stream) { @@ -100,9 +111,16 @@ public static void compressAsync(HostMemoryBuffer compressedSizeOutputBuffer, throw new IllegalArgumentException("compressed output size buffer must be able to hold " + "at least 8 bytes, size is only " + compressedSizeOutputBuffer.getLength()); } - NvcompJni.lz4CompressAsync(compressedSizeOutputBuffer.getAddress(), - input.getAddress(), input.getLength(), inputType.nativeId, chunkSize, - tempBuffer.getAddress(), tempBuffer.getLength(), output.getAddress(), output.getLength(), + NvcompJni.lz4CompressAsync( + compressedSizeOutputBuffer.getAddress(), + input.getAddress(), + input.getLength(), + inputType.nativeId, + chunkSize, + tempBuffer.getAddress(), + tempBuffer.getLength(), + output.getAddress(), + output.getLength(), stream.getStream()); } } diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/LZ4Decompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/LZ4Decompressor.java new file mode 100644 index 00000000000..46b3127581b --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/LZ4Decompressor.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf.nvcomp; + +import ai.rapids.cudf.BaseDeviceMemoryBuffer; +import ai.rapids.cudf.Cuda; + +/** Single-buffer decompression using LZ4 */ +public class LZ4Decompressor { + + /** + * LZ4 decompression settings corresponding to an LZ4 compressed input. + * NOTE: Each instance must be closed to avoid a native memory leak. + */ + public static final class Configuration implements AutoCloseable { + private final long metadataPtr; + private final long metadataSize; + private final long tempBytes; + private final long uncompressedBytes; + + Configuration(long metadataPtr, long metadataSize, long tempBytes, + long uncompressedBytes) { + this.metadataPtr = metadataPtr; + this.metadataSize = metadataSize; + this.tempBytes = tempBytes; + this.uncompressedBytes = uncompressedBytes; + } + + /** Get the host address of the metadata */ + public long getMetadataPtr() { + return metadataPtr; + } + + /** Get the size of the metadata in bytes */ + public long getMetadataSize() { + return metadataSize; + } + + /** Get the size of the temporary buffer in bytes needed to decompress */ + public long getTempBytes() { + return tempBytes; + } + + /** Get the size of the uncompressed data in bytes */ + public long getUncompressedBytes() { + return uncompressedBytes; + } + + @Override + public void close() { + NvcompJni.lz4DestroyMetadata(metadataPtr); + } + } + + /** + * Determine if a buffer is data compressed with LZ4. + * @param buffer data to examine + * @param stream CUDA stream to use + * @return true if the data is LZ4 compressed + */ + public static boolean isLZ4Data(BaseDeviceMemoryBuffer buffer, Cuda.Stream stream) { + return NvcompJni.isLZ4Data(buffer.getAddress(), buffer.getLength(), stream.getStream()); + } + + /** + * Get the decompression configuration from compressed data. + * NOTE: The resulting configuration object must be closed to avoid a native memory leak. + * @param compressed data that has been compressed by the LZ4 compressor + * @param stream CUDA stream to use + * @return decompression configuration for the specified input + */ + public static Configuration configure(BaseDeviceMemoryBuffer compressed, Cuda.Stream stream) { + long[] configs = NvcompJni.lz4DecompressConfigure(compressed.getAddress(), + compressed.getLength(), stream.getStream()); + assert configs.length == 4; + return new Configuration(configs[0], configs[1], configs[2], configs[3]); + } + + /** + * Asynchronously decompress data compressed with the LZ4 compressor. + * @param compressed buffer containing LZ4-compressed data + * @param config decompression configuration + * @param temp temporary storage buffer + * @param outputBuffer buffer that will be written with the uncompressed output + * @param stream CUDA stream to use + */ + public static void decompressAsync( + BaseDeviceMemoryBuffer compressed, + Configuration config, + BaseDeviceMemoryBuffer temp, + BaseDeviceMemoryBuffer outputBuffer, + Cuda.Stream stream) { + NvcompJni.lz4DecompressAsync( + compressed.getAddress(), + compressed.getLength(), + config.getMetadataPtr(), + config.getMetadataSize(), + temp.getAddress(), + temp.getLength(), + outputBuffer.getAddress(), + outputBuffer.getLength(), + stream.getStream()); + } +} diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java b/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java index 5ce0a8d815d..58f8390d0eb 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,69 +24,14 @@ class NvcompJni { NativeDepsLoader.loadNativeDeps(); } - /** - * Extracts the metadata from the input on the device and copies - * it to the host. Note that the result must be released with a - * call to decompressDestroyMetadata - * @param inPtr device address of the compressed data - * @param inSize size of the compressed data in bytes - * @param stream address of CUDA stream that will be used for synchronization - * @return address of the metadata on the host - */ - static native long decompressGetMetadata(long inPtr, long inSize, long stream); - - /** - * Destroys the metadata object and frees the associated memory. - * @param metadataPtr address of the metadata object - */ - static native void decompressDestroyMetadata(long metadataPtr); - - /** - * Computes the temporary storage size needed to decompress. - * This over-estimates the needed storage considerably. - * @param metadataPtr address of the metadata object - * @return the number of temporary storage bytes needed to decompress - */ - static native long decompressGetTempSize(long metadataPtr); - - /** - * Computes the decompressed size of the data. Gets this from the - * metadata contained in the compressed data. - * @param metadataPtr address of the metadata object - * @return the size of the decompressed data in bytes - */ - static native long decompressGetOutputSize(long metadataPtr); - - /** - * Perform asynchronous decompression using the specified CUDA stream. - * The input, temporary, and output buffers must all be in GPU-accessible - * memory. - * @param inPtr device address of the compressed buffer - * @param inSize size of the compressed data in bytes - * @param tempPtr device address of the temporary decompression storage buffer - * @param tempSize size of the temporary decompression storage buffer - * @param metadataPtr address of the metadata object - * @param outPtr device address of the buffer to use for uncompressed output - * @param outSize size of the uncompressed output buffer in bytes - * @param stream CUDA stream to use - */ - static native void decompressAsync( - long inPtr, - long inSize, - long tempPtr, - long tempSize, - long metadataPtr, - long outPtr, - long outSize, - long stream); - /** * Determine if data is compressed with the nvcomp LZ4 compressor. * @param inPtr device address of the compressed data * @param inSize size of the compressed data in bytes + * @param stream CUDA stream to use * @return true if the data is compressed with the nvcomp LZ4 compressor */ - static native boolean isLZ4Data(long inPtr, long inSize); + static native boolean isLZ4Data(long inPtr, long inSize, long stream); /** * Determine if the metadata corresponds to data compressed with the nvcomp LZ4 compressor. @@ -96,45 +41,21 @@ static native void decompressAsync( static native boolean isLZ4Metadata(long metadataPtr); /** - * Calculate the temporary buffer size needed for LZ4 compression. - * @param inPtr device address of the uncompressed data - * @param inSize size of the uncompressed data in bytes - * @param inputType type of uncompressed data - * @param chunkSize size of an LZ4 chunk in bytes - * @return number of temporary storage bytes needed to compress + * Return the LZ4 compression configuration necessary for a particular chunk size. + * @param chunkSize maximum size of an uncompressed chunk in bytes + * @param uncompressedSize total size of the uncompressed data + * @return array of three longs containing metadata size, temp storage size, + * and output buffer size */ - static native long lz4CompressGetTempSize( - long inPtr, - long inSize, - int inputType, - long chunkSize); + static native long[] lz4CompressConfigure(long chunkSize, long uncompressedSize); /** - * Calculate the output buffer size for LZ4 compression. The output - * size can be an estimated upper bound or the exact value. - * @param inPtr device address of the uncompressed data - * @param inSize size of the uncompressed data in bytes - * @param inputType type of uncompressed data - * @param chunkSize size of an LZ4 chunk in bytes - * @param tempPtr device address of the temporary storage buffer - * @param tempSize size of the temporary storage buffer in bytes - * @param computeExactSize set to true to compute the exact output size - * @return output buffer size in bytes. If computeExactSize is true then - * this is an exact size otherwise it is a maximum, worst-case size of the - * compressed data. - */ - static native long lz4CompressGetOutputSize( - long inPtr, - long inSize, - int inputType, - long chunkSize, - long tempPtr, - long tempSize, - boolean computeExactSize); - - /** - * Perform LZ4 compression synchronously using the specified CUDA - * stream. + * Perform LZ4 compression asynchronously using the specified CUDA stream. + * @param compressedSizeOutputPtr host address of a 64-bit integer to update + * with the resulting compressed size of the + * data. For the operation to be truly + * asynchronous this should point to pinned + * host memory. * @param inPtr device address of the uncompressed data * @param inSize size of the uncompressed data in bytes * @param inputType type of uncompressed data @@ -144,9 +65,9 @@ static native long lz4CompressGetOutputSize( * @param outPtr device address of the output buffer * @param outSize size of the output buffer in bytes * @param stream CUDA stream to use - * @return size of the compressed output in bytes */ - static native long lz4Compress( + static native void lz4CompressAsync( + long compressedSizeOutputPtr, long inPtr, long inSize, int inputType, @@ -158,29 +79,33 @@ static native long lz4Compress( long stream); /** - * Perform LZ4 compression synchronously using the specified CUDA - * stream. - * @param compressedSizeOutputPtr host address of a 64-bit integer to update - * with the resulting compressed size of the - * data. For the operation to be truly - * asynchronous this should point to pinned - * host memory. + * Return the decompression configuration for a compressed input. + * NOTE: The resulting configuration object must be closed to destroy the corresponding + * host-side metadata created by this method to avoid a native memory leak. + * @param inPtr device address of the compressed data + * @param inSize size of the compressed data + * @return array of four longs containing metadata address, metadata size, temp storage size, + * and output buffer size + */ + static native long[] lz4DecompressConfigure(long inPtr, long inSize, long stream); + + /** + * Perform LZ4 decompression asynchronously using the specified CUDA stream. * @param inPtr device address of the uncompressed data * @param inSize size of the uncompressed data in bytes - * @param inputType type of uncompressed data - * @param chunkSize size of an LZ4 chunk in bytes + * @param metadataPtr host address of the metadata + * @param metadataSize size of the metadata in bytes * @param tempPtr device address of the temporary compression storage buffer * @param tempSize size of the temporary storage buffer in bytes * @param outPtr device address of the output buffer * @param outSize size of the output buffer in bytes * @param stream CUDA stream to use */ - static native void lz4CompressAsync( - long compressedSizeOutputPtr, + static native void lz4DecompressAsync( long inPtr, long inSize, - int inputType, - long chunkSize, + long metadataPtr, + long metadataSize, long tempPtr, long tempSize, long outPtr, @@ -188,229 +113,99 @@ static native void lz4CompressAsync( long stream); /** - * Extracts the metadata from the batch of inputs on the device and - * copies them to the host. This synchronizes on the stream. - * @param inPtrs device addresses of the compressed buffers in the batch - * @param inSizes corresponding byte sizes of the compressed buffers - * @param stream CUDA stream to use - * @return handle to the batched decompress metadata on the host - */ - static native long batchedLZ4DecompressGetMetadata( - long[] inPtrs, - long[] inSizes, - long stream); - - /** - * Destroys batched metadata and frees the underlying host memory. - * @param batchedMetadata handle to the batched decompress metadata + * Destroy host-side metadata created by {@link NvcompJni#lz4DecompressConfigure(long, long, long)} + * @param metadataPtr host address of metadata */ - static native void batchedLZ4DecompressDestroyMetadata(long batchedMetadata); + static native void lz4DestroyMetadata(long metadataPtr); /** - * Computes the temporary storage size in bytes needed to decompress - * the compressed batch. - * @param batchedMetadata handle to the batched metadata - * @return number of temporary storage bytes needed to decompress the batch - */ - static native long batchedLZ4DecompressGetTempSize(long batchedMetadata); - - /** - * Computes the decompressed size of each chunk in the batch. - * @param batchedMetadata handle to the batched metadata - * @param numOutputs number of output buffers in the batch - * @return Array of corresponding output sizes needed to decompress - * each buffer in the batch. - */ - static native long[] batchedLZ4DecompressGetOutputSize( - long batchedMetadata, - long numOutputs); - - /** - * Asynchronously decompress a batch of compressed data buffers. - * @param inPtrs device addresses of the compressed buffers - * @param inSizes corresponding byte sizes of the compressed buffers - * @param tempPtr device address of the temporary decompression space - * @param tempSize size of the temporary decompression space in bytes - * @param batchedMetadata handle to the batched metadata - * @param outPtrs device addresses of the uncompressed output buffers - * @param outSizes corresponding byte sizes of the uncompressed output buffers - * @param stream CUDA stream to use - */ - static native void batchedLZ4DecompressAsync( - long[] inPtrs, - long[] inSizes, - long tempPtr, - long tempSize, - long batchedMetadata, - long[] outPtrs, - long[] outSizes, - long stream); - - /** - * Get the temporary workspace size required to perform compression of entire batch. - * @param inPtrs device addresses of the uncompressed buffers - * @param inSizes corresponding byte sizes of the uncompressed buffers - * @param chunkSize size of an LZ4 chunk in bytes + * Get the temporary workspace size required to perform compression of entire LZ4 batch. + * @param batchSize number of chunks in the batch + * @param maxChunkSize maximum size of an uncompressed chunk in bytes * @return The size of required temporary workspace in bytes to compress the batch. */ - static native long batchedLZ4CompressGetTempSize( - long[] inPtrs, - long[] inSizes, - long chunkSize); + static native long batchedLZ4CompressGetTempSize(long batchSize, long maxChunkSize); /** - * Get the required output sizes of each chunk to perform compression. - * @param inPtrs device addresses of the uncompressed buffers - * @param inSizes corresponding byte sizes of the uncompressed buffers - * @param chunkSize size of an LZ4 chunk in bytes - * @param tempPtr device address of the temporary workspace buffer - * @param tempSize size of the temporary workspace buffer in bytes - * @return array of corresponding sizes in bytes of the output buffers needed - * to compress the buffers in the batch. + * Get the maximum size any chunk could compress to in a LZ4 batch. This is the minimum amount of + * output memory to allocate per chunk when batch compressing. + * @param maxChunkSize maximum size of an uncompressed chunk size in bytes + * @return maximum compressed output size of a chunk */ - static native long[] batchedLZ4CompressGetOutputSize( - long[] inPtrs, - long[] inSizes, - long chunkSize, - long tempPtr, - long tempSize); + static native long batchedLZ4CompressGetMaxOutputChunkSize(long maxChunkSize); /** - * Asynchronously compress a batch of buffers. Note that + * Asynchronously compress a batch of buffers with LZ4. Note that * compressedSizesOutPtr must point to pinned memory for this operation * to be asynchronous. - * @param compressedSizesOutPtr host address where to write the sizes of the + * @param devInPtrs device address of uncompressed buffer addresses vector + * @param devInSizes device address of uncompressed buffer sizes vector + * @param chunkSize maximum size of an uncompressed chunk in bytes + * @param batchSize number of chunks in the batch + * @param tempPtr device address of the temporary workspace buffer + * @param tempSize size of the temporary workspace buffer in bytes + * @param devOutPtrs device address of output buffer addresses vector + * @param compressedSizesOutPtr device address where to write the sizes of the * compressed data written to the corresponding * output buffers. Must point to a buffer with * at least 8 bytes of memory per output buffer - * in the batch. For asynchronous operation - * this must point to pinned host memory. - * @param inPtrs device addresses of the uncompressed buffers - * @param inSizes corresponding byte sizes of the uncompressed buffers - * @param chunkSize size of an LZ4 chunk in bytes - * @param tempPtr device address of the temporary workspace buffer - * @param tempSize size of the temporary workspace buffer in bytes - * @param outPtrs device addresses of the output compressed buffers - * @param outSizes corresponding sizes in bytes of the output buffers + * in the batch. * @param stream CUDA stream to use */ static native void batchedLZ4CompressAsync( - long compressedSizesOutPtr, - long[] inPtrs, - long[] inSizes, + long devInPtrs, + long devInSizes, long chunkSize, + long batchSize, long tempPtr, long tempSize, - long[] outPtrs, - long[] outSizes, + long devOutPtrs, + long compressedSizesOutPtr, long stream); /** - * Calculate the temporary buffer size needed for cascaded compression. - * @param inPtr device address of the uncompressed data - * @param inSize size of the uncompressed data in bytes - * @param inputType type of uncompressed data - * @param numRLEs number of Run Length Encoding layers to use - * @param numDeltas number of delta layers to use - * @param useBitPacking set to true if bit-packing should be used - * @return number of temporary storage bytes needed to compress - */ - static native long cascadedCompressGetTempSize( - long inPtr, - long inSize, - int inputType, - int numRLEs, - int numDeltas, - boolean useBitPacking); - - /** - * Calculate the output buffer size for cascaded compression. The output - * size can be an estimated upper bound or the exact value. - * @param inPtr device address of the uncompressed data - * @param inSize size of the uncompressed data in bytes - * @param inputType type of uncompressed data - * @param numRLEs number of Run Length Encoding layers to use - * @param numDeltas number of delta layers to use - * @param useBitPacking set to true if bit-packing should be used - * @param tempPtr device address of the temporary compression storage buffer - * @param tempSize size of the temporary storage buffer in bytes - * @param computeExactSize set to true to compute the exact output size - * @return output buffer size in bytes. If computeExactSize is true then - * this is an exact size otherwise it is a maximum, worst-case size of the - * compressed data. + * Computes the temporary storage size in bytes needed to decompress a LZ4-compressed batch. + * @param numChunks number of chunks in the batch + * @param maxUncompressedChunkBytes maximum uncompressed size of any chunk in bytes + * @return number of temporary storage bytes needed to decompress the batch */ - static native long cascadedCompressGetOutputSize( - long inPtr, - long inSize, - int inputType, - int numRLEs, - int numDeltas, - boolean useBitPacking, - long tempPtr, - long tempSize, - boolean computeExactSize); + static native long batchedLZ4DecompressGetTempSize( + long numChunks, + long maxUncompressedChunkBytes); /** - * Perform cascaded compression synchronously using the specified CUDA - * stream. - * @param inPtr device address of the uncompressed data - * @param inSize size of the uncompressed data in bytes - * @param inputType type of uncompressed data - * @param numRLEs number of Run Length Encoding layers to use - * @param numDeltas number of delta layers to use - * @param useBitPacking set to true if bit-packing should be used - * @param tempPtr device address of the temporary compression storage buffer - * @param tempSize size of the temporary storage buffer in bytes - * @param outPtr device address of the output buffer - * @param outSize size of the output buffer in bytes + * Asynchronously decompress a batch of LZ4-compressed data buffers. + * @param devInPtrs device address of compressed input buffer addresses vector + * @param devInSizes device address of compressed input buffer sizes vector + * @param devOutSizes device address of uncompressed buffer sizes vector + * @param batchSize number of buffers in the batch + * @param tempPtr device address of the temporary decompression space + * @param tempSize size of the temporary decompression space in bytes + * @param devOutPtrs device address of uncompressed output buffer addresses vector * @param stream CUDA stream to use - * @return size of the compressed output in bytes */ - static native long cascadedCompress( - long inPtr, - long inSize, - int inputType, - int numRLEs, - int numDeltas, - boolean useBitPacking, + static native void batchedLZ4DecompressAsync( + long devInPtrs, + long devInSizes, + long devOutSizes, + long batchSize, long tempPtr, long tempSize, - long outPtr, - long outSize, + long devOutPtrs, long stream); /** - * Perform cascaded compression asynchronously using the specified CUDA - * stream. Note if the compressedSizeOutputPtr argument points to paged - * memory then this may synchronize in practice due to limitations of - * copying from the device to paged memory. - * @param compressedSizeOutputPtr address of a 64-bit integer to update with - * the resulting compressed size of the data. - * For the operation to be truly asynchronous - * this should point to pinned host memory. - * @param inPtr device address of the uncompressed data - * @param inSize size of the uncompressed data in bytes - * @param inputType type of uncompressed data - * @param numRLEs number of Run Length Encoding layers to use - * @param numDeltas number of delta layers to use - * @param useBitPacking set to true if bit-packing should be used - * @param tempPtr device address of the temporary compression storage buffer - * @param tempSize size of the temporary storage buffer in bytes - * @param outPtr device address of the output buffer - * @param outSize size of the output buffer in bytes + * Asynchronously calculates the decompressed size needed for each chunk. + * @param devInPtrs device address of compressed input buffer addresses vector + * @param devInSizes device address of compressed input buffer sizes vector + * @param devOutSizes device address of calculated decompress sizes vector + * @param batchSize number of buffers in the batch * @param stream CUDA stream to use */ - static native void cascadedCompressAsync( - long compressedSizeOutputPtr, - long inPtr, - long inSize, - int inputType, - int numRLEs, - int numDeltas, - boolean useBitPacking, - long tempPtr, - long tempSize, - long outPtr, - long outSize, + static native void batchedLZ4GetDecompressSizeAsync( + long devInPtrs, + long devInSizes, + long devOutSizes, + long batchSize, long stream); } diff --git a/java/src/main/native/CMakeLists.txt b/java/src/main/native/CMakeLists.txt index 2c95c6eebac..b5c7cfe8e6f 100755 --- a/java/src/main/native/CMakeLists.txt +++ b/java/src/main/native/CMakeLists.txt @@ -184,11 +184,21 @@ endif() ################################################################################################### # - nvcomp ---------------------------------------------------------------------------------------- -include(ConfigureNvcomp) -if(NVCOMP_FOUND) - message(STATUS "nvcomp compression library found in ${NVCOMP_ROOT}") +find_path(NVCOMP_INCLUDE "nvcomp" + HINTS "${CUDF_CPP_BUILD_DIR}/_deps/nvcomp-src/include" + "$ENV{CONDA_PREFIX}/include") + +message(STATUS "NVCOMP: NVCOMP_INCLUDE set to ${NVCOMP_INCLUDE}") + +set(CUDF_JNI_NVCOMP_LIBNAME "libnvcomp.a") +find_library(NVCOMP_LIBRARY ${CUDF_JNI_NVCOMP_LIBNAME} REQUIRED + HINTS "${CUDF_CPP_BUILD_DIR}/lib" + "$ENV{CONDA_PREFIX}/lib") + +if(NOT NVCOMP_LIBRARY) + message(FATAL_ERROR "nvcomp static library not found.") else() - message(FATAL_ERROR "nvcomp compression library not found.") + message(STATUS "NVCOMP: NVCOMP_LIBRARY set to ${NVCOMP_LIBRARY}") endif() ################################################################################################### @@ -218,7 +228,8 @@ add_library(cudfjni SHARED src/RmmJni.cpp src/ScalarJni.cpp src/TableJni.cpp - src/map_lookup.cu) + src/map_lookup.cu + src/check_nvcomp_output_sizes.cu) ################################################################################################### # - include paths --------------------------------------------------------------------------------- @@ -229,7 +240,7 @@ target_include_directories(cudfjni "${CUB_INCLUDE}" "${LIBCUDACXX_INCLUDE}" "${CUDAToolkit_INCLUDE_DIRS}" - "${NVCOMP_INCLUDE_DIR}" + "${NVCOMP_INCLUDE}" "${CMAKE_BINARY_DIR}/include" "${CMAKE_SOURCE_DIR}/include" "${SPDLOG_INCLUDE}" @@ -293,7 +304,7 @@ target_compile_definitions(cudfjni PUBLIC SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${RMM ################################################################################################### # - link libraries -------------------------------------------------------------------------------- -target_link_libraries(cudfjni PRIVATE nvcomp ${CUDF_LIB} ${ARROW_LIBRARY}) +target_link_libraries(cudfjni PRIVATE ${NVCOMP_LIBRARY} ${CUDF_LIB} ${ARROW_LIBRARY}) ################################################################################################### # - cudart options -------------------------------------------------------------------------------- diff --git a/java/src/main/native/cmake/Modules/ConfigureNvcomp.cmake b/java/src/main/native/cmake/Modules/ConfigureNvcomp.cmake deleted file mode 100644 index 1a0083e4518..00000000000 --- a/java/src/main/native/cmake/Modules/ConfigureNvcomp.cmake +++ /dev/null @@ -1,79 +0,0 @@ -#============================================================================= -# Copyright (c) 2020-2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#============================================================================= - -set(NVCOMP_ROOT "${CMAKE_BINARY_DIR}/nvcomp") - -if(CUDA_STATIC_RUNTIME) - set(NVCOMP_CUDA_RUNTIME_LIBRARY Static) -else() - set(NVCOMP_CUDA_RUNTIME_LIBRARY Shared) -endif() - -set(NVCOMP_CMAKE_ARGS "-DCMAKE_CUDA_RUNTIME_LIBRARY=${NVCOMP_CUDA_RUNTIME_LIBRARY} -DUSE_RMM=ON -DCUB_DIR=${CUB_INCLUDE}") - -configure_file("${CMAKE_SOURCE_DIR}/cmake/Templates/Nvcomp.CMakeLists.txt.cmake" - "${NVCOMP_ROOT}/CMakeLists.txt") - -file(MAKE_DIRECTORY "${NVCOMP_ROOT}/build") - -execute_process(COMMAND ${CMAKE_COMMAND} -G ${CMAKE_GENERATOR} . - RESULT_VARIABLE NVCOMP_CONFIG - WORKING_DIRECTORY ${NVCOMP_ROOT}) - -if(NVCOMP_CONFIG) - message(FATAL_ERROR "Configuring nvcomp failed: " ${NVCOMP_CONFIG}) -endif() - -set(PARALLEL_BUILD -j) -if($ENV{PARALLEL_LEVEL}) - set(NUM_JOBS $ENV{PARALLEL_LEVEL}) - set(PARALLEL_BUILD "${PARALLEL_BUILD}${NUM_JOBS}") -endif() - -if(${NUM_JOBS}) - if(${NUM_JOBS} EQUAL 1) - message(STATUS "NVCOMP BUILD: Enabling Sequential CMake build") - elseif(${NUM_JOBS} GREATER 1) - message(STATUS "NVCOMP BUILD: Enabling Parallel CMake build with ${NUM_JOBS} jobs") - endif() -else() - message(STATUS "NVCOMP BUILD: Enabling Parallel CMake build with all threads") -endif() - -execute_process(COMMAND ${CMAKE_COMMAND} --build .. -- ${PARALLEL_BUILD} - RESULT_VARIABLE NVCOMP_BUILD - WORKING_DIRECTORY ${NVCOMP_ROOT}/build) - -if(NVCOMP_BUILD) - message(FATAL_ERROR "Building nvcomp failed: " ${NVCOMP_BUILD}) -endif() - -message(STATUS "nvcomp build completed at: " ${NVCOMP_ROOT}/build) - -set(NVCOMP_INCLUDE_DIR "${NVCOMP_ROOT}/build/include") -set(NVCOMP_LIBRARY_DIR "${NVCOMP_ROOT}/build/lib") - -find_library(NVCOMP_LIB nvcomp - NO_DEFAULT_PATH - HINTS "${NVCOMP_LIBRARY_DIR}") - -if(NVCOMP_LIB) - message(STATUS "nvcomp library: " ${NVCOMP_LIB}) - set(NVCOMP_FOUND TRUE) - - add_library(nvcomp STATIC IMPORTED) - set_target_properties(nvcomp PROPERTIES IMPORTED_LOCATION "${NVCOMP_LIB}") -endif() diff --git a/java/src/main/native/cmake/Templates/Nvcomp.CMakeLists.txt.cmake b/java/src/main/native/cmake/Templates/Nvcomp.CMakeLists.txt.cmake deleted file mode 100644 index 5761ef8fd1b..00000000000 --- a/java/src/main/native/cmake/Templates/Nvcomp.CMakeLists.txt.cmake +++ /dev/null @@ -1,33 +0,0 @@ -#============================================================================= -# Copyright (c) 2020-2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#============================================================================= - -cmake_minimum_required(VERSION 3.12) - -project(nvcomp) - -include(ExternalProject) - -ExternalProject_Add(nvcomp - GIT_REPOSITORY https://github.com/NVIDIA/nvcomp.git - GIT_TAG v1.2.1 - GIT_SHALLOW true - SOURCE_DIR "${NVCOMP_ROOT}/nvcomp" - BINARY_DIR "${NVCOMP_ROOT}/build" - INSTALL_DIR "${NVCOMP_ROOT}/install" - PATCH_COMMAND patch --reject-file=- -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/cmake/nvcomp.patch || true - CMAKE_ARGS ${NVCOMP_CMAKE_ARGS} -DCMAKE_INSTALL_PREFIX=${NVCOMP_ROOT}/install - BUILD_COMMAND ${CMAKE_COMMAND} --build . --target nvcomp - INSTALL_COMMAND ${CMAKE_COMMAND} -E echo "Skipping nvcomp install step.") diff --git a/java/src/main/native/cmake/nvcomp.patch b/java/src/main/native/cmake/nvcomp.patch deleted file mode 100644 index ea1340b7754..00000000000 --- a/java/src/main/native/cmake/nvcomp.patch +++ /dev/null @@ -1,15 +0,0 @@ -diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt -index 32f48ef..a2e3125 100644 ---- a/src/CMakeLists.txt -+++ b/src/CMakeLists.txt -@@ -10,7 +10,9 @@ endif() - file(GLOB CUDA_SOURCES *.cu) - file(GLOB CPP_SOURCES *.cpp) - --add_library(nvcomp SHARED ${CUDA_SOURCES} ${CPP_SOURCES}) -+ -+add_library(nvcomp STATIC ${CUDA_SOURCES} ${CPP_SOURCES}) -+set_property(TARGET nvcomp PROPERTY POSITION_INDEPENDENT_CODE True) - set_property(TARGET nvcomp PROPERTY CUDA_ARCHITECTURES ${GPU_ARCHS}) - target_compile_options(nvcomp PRIVATE - $<$:--expt-extended-lambda -Xcompiler -pthread>) diff --git a/java/src/main/native/src/NvcompJni.cpp b/java/src/main/native/src/NvcompJni.cpp index 0e34d2856f5..d551e9414d1 100644 --- a/java/src/main/native/src/NvcompJni.cpp +++ b/java/src/main/native/src/NvcompJni.cpp @@ -13,11 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -#include -#include #include +#include +#include + +#include "check_nvcomp_output_sizes.hpp" #include "cudf_jni_apis.hpp" namespace { @@ -27,7 +28,7 @@ constexpr char const *NVCOMP_CUDA_ERROR_CLASS = "ai/rapids/cudf/nvcomp/NvcompCud constexpr char const *ILLEGAL_ARG_CLASS = "java/lang/IllegalArgumentException"; constexpr char const *UNSUPPORTED_CLASS = "java/lang/UnsupportedOperationException"; -void check_nvcomp_status(JNIEnv *env, nvcompError_t status) { +void check_nvcomp_status(JNIEnv *env, nvcompStatus_t status) { switch (status) { case nvcompSuccess: break; case nvcompErrorInvalidValue: @@ -36,9 +37,15 @@ void check_nvcomp_status(JNIEnv *env, nvcompError_t status) { case nvcompErrorNotSupported: cudf::jni::throw_java_exception(env, UNSUPPORTED_CLASS, "nvcomp unsupported"); break; + case nvcompErrorCannotDecompress: + cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "nvcomp cannot decompress"); + break; case nvcompErrorCudaError: cudf::jni::throw_java_exception(env, NVCOMP_CUDA_ERROR_CLASS, "nvcomp CUDA error"); break; + case nvcompErrorInternal: + cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "nvcomp internal error"); + break; default: cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "nvcomp unknown error"); break; @@ -49,74 +56,16 @@ void check_nvcomp_status(JNIEnv *env, nvcompError_t status) { extern "C" { -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_decompressGetMetadata( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jlong jstream) { - try { - cudf::jni::auto_set_device(env); - void *metadata_ptr; - auto stream = reinterpret_cast(jstream); - auto status = nvcompDecompressGetMetadata(reinterpret_cast(in_ptr), in_size, - &metadata_ptr, stream); - check_nvcomp_status(env, status); - return reinterpret_cast(metadata_ptr); - } - CATCH_STD(env, 0); -} - -JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_decompressDestroyMetadata( - JNIEnv *env, jclass, jlong metadata_ptr) { - try { - cudf::jni::auto_set_device(env); - nvcompDecompressDestroyMetadata(reinterpret_cast(metadata_ptr)); - } - CATCH_STD(env, ); -} - -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_decompressGetTempSize( - JNIEnv *env, jclass, jlong metadata_ptr) { - try { - cudf::jni::auto_set_device(env); - size_t temp_size; - auto status = nvcompDecompressGetTempSize(reinterpret_cast(metadata_ptr), &temp_size); - check_nvcomp_status(env, status); - return temp_size; - } - CATCH_STD(env, 0); -} - -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_decompressGetOutputSize( - JNIEnv *env, jclass, jlong metadata_ptr) { - try { - cudf::jni::auto_set_device(env); - size_t out_size; - auto status = nvcompDecompressGetOutputSize(reinterpret_cast(metadata_ptr), &out_size); - check_nvcomp_status(env, status); - return out_size; - } - CATCH_STD(env, 0); -} - -JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_decompressAsync( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jlong temp_ptr, jlong temp_size, - jlong metadata_ptr, jlong out_ptr, jlong out_size, jlong jstream) { - try { - cudf::jni::auto_set_device(env); - auto stream = reinterpret_cast(jstream); - auto status = nvcompDecompressAsync(reinterpret_cast(in_ptr), in_size, - reinterpret_cast(temp_ptr), temp_size, - reinterpret_cast(metadata_ptr), - reinterpret_cast(out_ptr), out_size, stream); - check_nvcomp_status(env, status); - } - CATCH_STD(env, ); -} - JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_isLZ4Data(JNIEnv *env, jclass, - jlong in_ptr, - jlong in_size) { + jlong j_in_ptr, + jlong j_in_size, + jlong j_stream) { try { cudf::jni::auto_set_device(env); - return LZ4IsData(reinterpret_cast(in_ptr), in_size); + auto in_ptr = reinterpret_cast(j_in_ptr); + auto in_size = static_cast(j_in_size); + auto stream = reinterpret_cast(j_stream); + return LZ4IsData(in_ptr, in_size, stream); } CATCH_STD(env, 0) } @@ -130,365 +79,215 @@ JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_isLZ4Metadata(JN CATCH_STD(env, 0) } -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4CompressGetTempSize( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jint input_type, jlong chunk_size) { - try { - cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); - nvcompLZ4FormatOpts opts{}; - opts.chunk_size = chunk_size; - size_t temp_size; - auto status = nvcompLZ4CompressGetTempSize(reinterpret_cast(in_ptr), in_size, comp_type, - &opts, &temp_size); - check_nvcomp_status(env, status); - return temp_size; - } - CATCH_STD(env, 0); -} - -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4CompressGetOutputSize( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jint input_type, jlong chunk_size, - jlong temp_ptr, jlong temp_size, jboolean compute_exact) { - try { - cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); - nvcompLZ4FormatOpts opts{}; - opts.chunk_size = chunk_size; - size_t out_size; - auto status = nvcompLZ4CompressGetOutputSize( - reinterpret_cast(in_ptr), in_size, comp_type, &opts, - reinterpret_cast(temp_ptr), temp_size, &out_size, compute_exact); - check_nvcomp_status(env, status); - return out_size; - } - CATCH_STD(env, 0); -} - -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4Compress( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jint input_type, jlong chunk_size, - jlong temp_ptr, jlong temp_size, jlong out_ptr, jlong out_size, jlong jstream) { +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4CompressConfigure( + JNIEnv *env, jclass, jlong j_chunk_size, jlong j_uncompressed_size) { try { cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); nvcompLZ4FormatOpts opts{}; - opts.chunk_size = chunk_size; - auto stream = reinterpret_cast(jstream); - size_t compressed_size = out_size; - auto status = - nvcompLZ4CompressAsync(reinterpret_cast(in_ptr), in_size, comp_type, &opts, - reinterpret_cast(temp_ptr), temp_size, - reinterpret_cast(out_ptr), &compressed_size, stream); + opts.chunk_size = static_cast(j_chunk_size); + auto uncompressed_size = static_cast(j_uncompressed_size); + std::size_t metadata_bytes = 0; + std::size_t temp_bytes = 0; + std::size_t out_bytes = 0; + auto status = nvcompLZ4CompressConfigure(&opts, NVCOMP_TYPE_CHAR, uncompressed_size, + &metadata_bytes, &temp_bytes, &out_bytes); check_nvcomp_status(env, status); - if (cudaStreamSynchronize(stream) != cudaSuccess) { - JNI_THROW_NEW(env, NVCOMP_CUDA_ERROR_CLASS, "Error synchronizing stream", 0); - } - return compressed_size; + cudf::jni::native_jlongArray result(env, 3); + result[0] = static_cast(metadata_bytes); + result[1] = static_cast(temp_bytes); + result[2] = static_cast(out_bytes); + return result.get_jArray(); } CATCH_STD(env, 0); } JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4CompressAsync( - JNIEnv *env, jclass, jlong compressed_output_ptr, jlong in_ptr, jlong in_size, jint input_type, - jlong chunk_size, jlong temp_ptr, jlong temp_size, jlong out_ptr, jlong out_size, - jlong jstream) { + JNIEnv *env, jclass, jlong j_compressed_size_ptr, jlong j_in_ptr, jlong j_in_size, + jint j_input_type, jlong j_chunk_size, jlong j_temp_ptr, jlong j_temp_size, jlong j_out_ptr, + jlong j_out_size, jlong j_stream) { try { cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); + auto in_ptr = reinterpret_cast(j_in_ptr); + auto in_size = static_cast(j_in_size); + auto comp_type = static_cast(j_input_type); nvcompLZ4FormatOpts opts{}; - opts.chunk_size = chunk_size; - auto stream = reinterpret_cast(jstream); - auto compressed_size_ptr = reinterpret_cast(compressed_output_ptr); - *compressed_size_ptr = out_size; - auto status = - nvcompLZ4CompressAsync(reinterpret_cast(in_ptr), in_size, comp_type, &opts, - reinterpret_cast(temp_ptr), temp_size, - reinterpret_cast(out_ptr), compressed_size_ptr, stream); + opts.chunk_size = static_cast(j_chunk_size); + auto temp_ptr = reinterpret_cast(j_temp_ptr); + auto temp_size = static_cast(j_temp_size); + auto out_ptr = reinterpret_cast(j_out_ptr); + auto compressed_size_ptr = reinterpret_cast(j_compressed_size_ptr); + auto stream = reinterpret_cast(j_stream); + auto status = nvcompLZ4CompressAsync(&opts, comp_type, in_ptr, in_size, temp_ptr, temp_size, + out_ptr, compressed_size_ptr, stream); check_nvcomp_status(env, status); } CATCH_STD(env, ); } -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4DecompressGetMetadata( - JNIEnv *env, jclass, jlongArray in_ptrs, jlongArray in_sizes, jlong jstream) { +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4DecompressConfigure( + JNIEnv *env, jclass, jlong j_input_ptr, jlong j_input_size, jlong j_stream) { try { cudf::jni::auto_set_device(env); - - cudf::jni::native_jpointerArray input_ptrs(env, in_ptrs); - cudf::jni::native_jlongArray input_jsizes(env, in_sizes); - if (input_ptrs.size() != input_jsizes.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "input array size mismatch"); - } - std::vector sizes; - std::transform(input_jsizes.data(), input_jsizes.data() + input_jsizes.size(), - std::back_inserter(sizes), - [](jlong x) -> size_t { return static_cast(x); }); - + auto compressed_ptr = reinterpret_cast(j_input_ptr); + auto compressed_bytes = static_cast(j_input_size); void *metadata_ptr = nullptr; - auto stream = reinterpret_cast(jstream); - auto status = nvcompBatchedLZ4DecompressGetMetadata(input_ptrs.data(), sizes.data(), - input_ptrs.size(), &metadata_ptr, stream); - check_nvcomp_status(env, status); - return reinterpret_cast(metadata_ptr); - } - CATCH_STD(env, 0); -} - -JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4DecompressDestroyMetadata( - JNIEnv *env, jclass, jlong metadata_ptr) { - try { - cudf::jni::auto_set_device(env); - nvcompBatchedLZ4DecompressDestroyMetadata(reinterpret_cast(metadata_ptr)); - } - CATCH_STD(env, ); -} - -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4DecompressGetTempSize( - JNIEnv *env, jclass, jlong metadata_ptr) { - try { - cudf::jni::auto_set_device(env); - size_t temp_size; + std::size_t metadata_bytes = 0; + std::size_t temp_bytes = 0; + std::size_t uncompressed_bytes = 0; + auto stream = reinterpret_cast(j_stream); auto status = - nvcompBatchedLZ4DecompressGetTempSize(reinterpret_cast(metadata_ptr), &temp_size); + nvcompLZ4DecompressConfigure(compressed_ptr, compressed_bytes, &metadata_ptr, + &metadata_bytes, &temp_bytes, &uncompressed_bytes, stream); check_nvcomp_status(env, status); - return static_cast(temp_size); + cudf::jni::native_jlongArray result(env, 4); + result[0] = reinterpret_cast(metadata_ptr); + result[1] = static_cast(metadata_bytes); + result[2] = static_cast(temp_bytes); + result[3] = static_cast(uncompressed_bytes); + return result.get_jArray(); } CATCH_STD(env, 0); } -JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4DecompressGetOutputSize( - JNIEnv *env, jclass, jlong metadata_ptr, jint num_outputs) { +JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4DecompressAsync( + JNIEnv *env, jclass, jlong j_in_ptr, jlong j_in_size, jlong j_metadata_ptr, + jlong j_metadata_size, jlong j_temp_ptr, jlong j_temp_size, jlong j_out_ptr, jlong j_out_size, + jlong j_stream) { try { cudf::jni::auto_set_device(env); - std::vector sizes(num_outputs); - auto status = nvcompBatchedLZ4DecompressGetOutputSize(reinterpret_cast(metadata_ptr), - num_outputs, sizes.data()); + auto compressed_ptr = reinterpret_cast(j_in_ptr); + auto compressed_bytes = static_cast(j_in_size); + auto metadata_ptr = reinterpret_cast(j_metadata_ptr); + auto metadata_bytes = static_cast(j_metadata_size); + auto temp_ptr = reinterpret_cast(j_temp_ptr); + auto temp_bytes = static_cast(j_temp_size); + auto uncompressed_ptr = reinterpret_cast(j_out_ptr); + auto uncompressed_bytes = static_cast(j_out_size); + auto stream = reinterpret_cast(j_stream); + auto status = nvcompLZ4DecompressAsync(compressed_ptr, compressed_bytes, metadata_ptr, + metadata_bytes, temp_ptr, temp_bytes, uncompressed_ptr, + uncompressed_bytes, stream); check_nvcomp_status(env, status); - cudf::jni::native_jlongArray jsizes(env, num_outputs); - std::transform(sizes.begin(), sizes.end(), jsizes.data(), - [](size_t x) -> jlong { return static_cast(x); }); - return jsizes.get_jArray(); } - CATCH_STD(env, NULL); + CATCH_STD(env, ); } -JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4DecompressAsync( - JNIEnv *env, jclass, jlongArray in_ptrs, jlongArray in_sizes, jlong temp_ptr, jlong temp_size, - jlong metadata_ptr, jlongArray out_ptrs, jlongArray out_sizes, jlong jstream) { +JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_lz4DestroyMetadata(JNIEnv *env, jclass, + jlong metadata_ptr) { try { cudf::jni::auto_set_device(env); - cudf::jni::native_jpointerArray input_ptrs(env, in_ptrs); - cudf::jni::native_jlongArray input_jsizes(env, in_sizes); - if (input_ptrs.size() != input_jsizes.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "input array size mismatch"); - } - std::vector input_sizes; - std::transform(input_jsizes.data(), input_jsizes.data() + input_jsizes.size(), - std::back_inserter(input_sizes), - [](jlong x) -> size_t { return static_cast(x); }); - - cudf::jni::native_jpointerArray output_ptrs(env, out_ptrs); - cudf::jni::native_jlongArray output_jsizes(env, out_sizes); - if (output_ptrs.size() != output_jsizes.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "output array size mismatch"); - } - if (input_ptrs.size() != output_ptrs.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "input/output array size mismatch"); - } - std::vector output_sizes; - std::transform(output_jsizes.data(), output_jsizes.data() + output_jsizes.size(), - std::back_inserter(output_sizes), - [](jlong x) -> size_t { return static_cast(x); }); - - auto stream = reinterpret_cast(jstream); - auto status = nvcompBatchedLZ4DecompressAsync( - input_ptrs.data(), input_sizes.data(), input_ptrs.size(), - reinterpret_cast(temp_ptr), static_cast(temp_size), - reinterpret_cast(metadata_ptr), output_ptrs.data(), output_sizes.data(), stream); - check_nvcomp_status(env, status); + nvcompLZ4DestroyMetadata(reinterpret_cast(metadata_ptr)); } CATCH_STD(env, ); } JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4CompressGetTempSize( - JNIEnv *env, jclass, jlongArray in_ptrs, jlongArray in_sizes, jlong chunk_size) { + JNIEnv *env, jclass, jlong j_batch_size, jlong j_max_chunk_size) { try { cudf::jni::auto_set_device(env); - cudf::jni::native_jpointerArray input_ptrs(env, in_ptrs); - cudf::jni::native_jlongArray input_jsizes(env, in_sizes); - if (input_ptrs.size() != input_jsizes.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "input array size mismatch"); - } - std::vector sizes; - std::transform(input_jsizes.data(), input_jsizes.data() + input_jsizes.size(), - std::back_inserter(sizes), - [](jlong x) -> size_t { return static_cast(x); }); - - nvcompLZ4FormatOpts opts{}; - opts.chunk_size = chunk_size; - size_t temp_size = 0; - auto status = nvcompBatchedLZ4CompressGetTempSize(input_ptrs.data(), sizes.data(), - input_ptrs.size(), &opts, &temp_size); + auto batch_size = static_cast(j_batch_size); + auto max_chunk_size = static_cast(j_max_chunk_size); + std::size_t temp_size = 0; + auto status = nvcompBatchedLZ4CompressGetTempSize(batch_size, max_chunk_size, + nvcompBatchedLZ4DefaultOpts, &temp_size); check_nvcomp_status(env, status); return static_cast(temp_size); } CATCH_STD(env, 0); } -JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4CompressGetOutputSize( - JNIEnv *env, jclass, jlongArray in_ptrs, jlongArray in_sizes, jlong chunk_size, jlong temp_ptr, - jlong temp_size) { +JNIEXPORT jlong JNICALL +Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4CompressGetMaxOutputChunkSize( + JNIEnv *env, jclass, jlong j_max_chunk_size) { try { cudf::jni::auto_set_device(env); - cudf::jni::native_jpointerArray input_ptrs(env, in_ptrs); - cudf::jni::native_jlongArray input_jsizes(env, in_sizes); - if (input_ptrs.size() != input_jsizes.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "input array size mismatch"); - } - std::vector input_sizes; - std::transform(input_jsizes.data(), input_jsizes.data() + input_jsizes.size(), - std::back_inserter(input_sizes), - [](jlong x) -> size_t { return static_cast(x); }); - - nvcompLZ4FormatOpts opts{}; - opts.chunk_size = chunk_size; - std::vector output_sizes(input_ptrs.size()); - auto status = nvcompBatchedLZ4CompressGetOutputSize( - input_ptrs.data(), input_sizes.data(), input_ptrs.size(), &opts, - reinterpret_cast(temp_ptr), static_cast(temp_size), output_sizes.data()); + auto max_chunk_size = static_cast(j_max_chunk_size); + std::size_t max_output_size = 0; + auto status = nvcompBatchedLZ4CompressGetMaxOutputChunkSize( + max_chunk_size, nvcompBatchedLZ4DefaultOpts, &max_output_size); check_nvcomp_status(env, status); - cudf::jni::native_jlongArray jsizes(env, input_ptrs.size()); - std::transform(output_sizes.begin(), output_sizes.end(), jsizes.data(), - [](size_t x) -> jlong { return static_cast(x); }); - return jsizes.get_jArray(); + return static_cast(max_output_size); } - CATCH_STD(env, NULL); + CATCH_STD(env, 0); } JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4CompressAsync( - JNIEnv *env, jclass, jlong compressed_sizes_out_ptr, jlongArray in_ptrs, jlongArray in_sizes, - jlong chunk_size, jlong temp_ptr, jlong temp_size, jlongArray out_ptrs, jlongArray out_sizes, - jlong jstream) { + JNIEnv *env, jclass, jlong j_in_ptrs, jlong j_in_sizes, jlong j_chunk_size, jlong j_batch_size, + jlong j_temp_ptr, jlong j_temp_size, jlong j_out_ptrs, jlong j_compressed_sizes_out_ptr, + jlong j_stream) { try { cudf::jni::auto_set_device(env); - cudf::jni::native_jpointerArray input_ptrs(env, in_ptrs); - cudf::jni::native_jlongArray input_jsizes(env, in_sizes); - if (input_ptrs.size() != input_jsizes.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "input array size mismatch"); - } - std::vector input_sizes; - std::transform(input_jsizes.data(), input_jsizes.data() + input_jsizes.size(), - std::back_inserter(input_sizes), - [](jlong x) -> size_t { return static_cast(x); }); - - cudf::jni::native_jpointerArray output_ptrs(env, out_ptrs); - cudf::jni::native_jlongArray output_jsizes(env, out_sizes); - if (output_ptrs.size() != output_jsizes.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "output array size mismatch"); - } - if (input_ptrs.size() != output_ptrs.size()) { - cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, "input/output array size mismatch"); - } - - auto output_sizes = reinterpret_cast(compressed_sizes_out_ptr); - std::transform(output_jsizes.data(), output_jsizes.data() + output_jsizes.size(), output_sizes, - [](jlong x) -> size_t { return static_cast(x); }); - - nvcompLZ4FormatOpts opts{}; - opts.chunk_size = chunk_size; - auto stream = reinterpret_cast(jstream); - auto status = nvcompBatchedLZ4CompressAsync( - input_ptrs.data(), input_sizes.data(), input_ptrs.size(), &opts, - reinterpret_cast(temp_ptr), static_cast(temp_size), output_ptrs.data(), - output_sizes, // input/output parameter - stream); + auto in_ptrs = reinterpret_cast(j_in_ptrs); + auto in_sizes = reinterpret_cast(j_in_sizes); + auto chunk_size = static_cast(j_chunk_size); + auto batch_size = static_cast(j_batch_size); + auto temp_ptr = reinterpret_cast(j_temp_ptr); + auto temp_size = static_cast(j_temp_size); + auto out_ptrs = reinterpret_cast(j_out_ptrs); + auto compressed_out_sizes = reinterpret_cast(j_compressed_sizes_out_ptr); + auto stream = reinterpret_cast(j_stream); + auto status = nvcompBatchedLZ4CompressAsync(in_ptrs, in_sizes, chunk_size, batch_size, temp_ptr, + temp_size, out_ptrs, compressed_out_sizes, + nvcompBatchedLZ4DefaultOpts, stream); check_nvcomp_status(env, status); } CATCH_STD(env, ); } -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_cascadedCompressGetTempSize( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jint input_type, jint num_rles, - jint num_deltas, jboolean use_bp) { - try { - cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); - nvcompCascadedFormatOpts opts{}; - opts.num_RLEs = num_rles; - opts.num_deltas = num_deltas; - opts.use_bp = use_bp; - size_t temp_size; - auto status = nvcompCascadedCompressGetTempSize(reinterpret_cast(in_ptr), in_size, - comp_type, &opts, &temp_size); - check_nvcomp_status(env, status); - return temp_size; - } - CATCH_STD(env, 0); -} - -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_cascadedCompressGetOutputSize( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jint input_type, jint num_rles, - jint num_deltas, jboolean use_bp, jlong temp_ptr, jlong temp_size, jboolean compute_exact) { +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4DecompressGetTempSize( + JNIEnv *env, jclass, jlong j_batch_size, jlong j_chunk_size) { try { cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); - nvcompCascadedFormatOpts opts{}; - opts.num_RLEs = num_rles; - opts.num_deltas = num_deltas; - opts.use_bp = use_bp; - size_t out_size; - auto status = nvcompCascadedCompressGetOutputSize( - reinterpret_cast(in_ptr), in_size, comp_type, &opts, - reinterpret_cast(temp_ptr), temp_size, &out_size, compute_exact); + auto batch_size = static_cast(j_batch_size); + auto chunk_size = static_cast(j_chunk_size); + std::size_t temp_size = 0; + auto status = nvcompBatchedLZ4DecompressGetTempSize(batch_size, chunk_size, &temp_size); check_nvcomp_status(env, status); - return out_size; + return static_cast(temp_size); } CATCH_STD(env, 0); } -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_cascadedCompress( - JNIEnv *env, jclass, jlong in_ptr, jlong in_size, jint input_type, jint num_rles, - jint num_deltas, jboolean use_bp, jlong temp_ptr, jlong temp_size, jlong out_ptr, - jlong out_size, jlong jstream) { +JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4DecompressAsync( + JNIEnv *env, jclass, jlong j_in_ptrs, jlong j_in_sizes, jlong j_out_sizes, jlong j_batch_size, + jlong j_temp_ptr, jlong j_temp_size, jlong j_out_ptrs, jlong j_stream) { try { cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); - nvcompCascadedFormatOpts opts{}; - opts.num_RLEs = num_rles; - opts.num_deltas = num_deltas; - opts.use_bp = use_bp; - auto stream = reinterpret_cast(jstream); - size_t compressed_size = out_size; - auto status = - nvcompCascadedCompressAsync(reinterpret_cast(in_ptr), in_size, comp_type, &opts, - reinterpret_cast(temp_ptr), temp_size, - reinterpret_cast(out_ptr), &compressed_size, stream); + auto compressed_ptrs = reinterpret_cast(j_in_ptrs); + auto compressed_sizes = reinterpret_cast(j_in_sizes); + auto uncompressed_sizes = reinterpret_cast(j_out_sizes); + auto batch_size = static_cast(j_batch_size); + auto temp_ptr = reinterpret_cast(j_temp_ptr); + auto temp_size = static_cast(j_temp_size); + auto uncompressed_ptrs = reinterpret_cast(j_out_ptrs); + auto stream = reinterpret_cast(j_stream); + auto uncompressed_statuses = rmm::device_uvector(batch_size, stream); + auto actual_uncompressed_sizes = rmm::device_uvector(batch_size, stream); + auto status = nvcompBatchedLZ4DecompressAsync( + compressed_ptrs, compressed_sizes, uncompressed_sizes, actual_uncompressed_sizes.data(), + batch_size, temp_ptr, temp_size, uncompressed_ptrs, uncompressed_statuses.data(), stream); check_nvcomp_status(env, status); - if (cudaStreamSynchronize(stream) != cudaSuccess) { - JNI_THROW_NEW(env, NVCOMP_CUDA_ERROR_CLASS, "Error synchronizing stream", 0); + if (!cudf::java::check_nvcomp_output_sizes(uncompressed_sizes, actual_uncompressed_sizes.data(), + batch_size, stream)) { + cudf::jni::throw_java_exception(env, NVCOMP_ERROR_CLASS, + "nvcomp decompress output size mismatch"); } - return compressed_size; } - CATCH_STD(env, 0); + CATCH_STD(env, ); } -JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_cascadedCompressAsync( - JNIEnv *env, jclass, jlong compressed_output_ptr, jlong in_ptr, jlong in_size, jint input_type, - jint num_rles, jint num_deltas, jboolean use_bp, jlong temp_ptr, jlong temp_size, jlong out_ptr, - jlong out_size, jlong jstream) { +JNIEXPORT void JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4GetDecompressSizeAsync( + JNIEnv *env, jclass, jlong j_in_ptrs, jlong j_in_sizes, jlong j_out_sizes, jlong j_batch_size, + jlong j_stream) { try { cudf::jni::auto_set_device(env); - auto comp_type = static_cast(input_type); - nvcompCascadedFormatOpts opts{}; - opts.num_RLEs = num_rles; - opts.num_deltas = num_deltas; - opts.use_bp = use_bp; - auto stream = reinterpret_cast(jstream); - auto compressed_size_ptr = reinterpret_cast(compressed_output_ptr); - *compressed_size_ptr = out_size; - auto status = - nvcompCascadedCompressAsync(reinterpret_cast(in_ptr), in_size, comp_type, &opts, - reinterpret_cast(temp_ptr), temp_size, - reinterpret_cast(out_ptr), compressed_size_ptr, stream); + auto compressed_ptrs = reinterpret_cast(j_in_ptrs); + auto compressed_sizes = reinterpret_cast(j_in_sizes); + auto uncompressed_sizes = reinterpret_cast(j_out_sizes); + auto batch_size = static_cast(j_batch_size); + auto stream = reinterpret_cast(j_stream); + auto status = nvcompBatchedLZ4GetDecompressSizeAsync(compressed_ptrs, compressed_sizes, + uncompressed_sizes, batch_size, stream); check_nvcomp_status(env, status); } CATCH_STD(env, ); diff --git a/java/src/main/native/src/check_nvcomp_output_sizes.cu b/java/src/main/native/src/check_nvcomp_output_sizes.cu new file mode 100644 index 00000000000..944399882b8 --- /dev/null +++ b/java/src/main/native/src/check_nvcomp_output_sizes.cu @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include "check_nvcomp_output_sizes.hpp" + +namespace { + +struct java_domain { + static constexpr char const *name{"Java"}; +}; + +} // anonymous namespace + +namespace cudf { +namespace java { + +/** + * Check that the vector of expected uncompressed sizes matches the vector of actual compressed + * sizes. Both vectors are assumed to be in device memory and contain num_chunks elements. + */ +bool check_nvcomp_output_sizes(std::size_t const *dev_uncompressed_sizes, + std::size_t const *dev_actual_uncompressed_sizes, + std::size_t num_chunks, rmm::cuda_stream_view stream) { + NVTX3_FUNC_RANGE_IN(java_domain); + return thrust::equal(rmm::exec_policy(stream), dev_uncompressed_sizes, + dev_uncompressed_sizes + num_chunks, dev_actual_uncompressed_sizes); +} + +} // namespace java +} // namespace cudf diff --git a/java/src/main/native/src/check_nvcomp_output_sizes.hpp b/java/src/main/native/src/check_nvcomp_output_sizes.hpp new file mode 100644 index 00000000000..00b36471a85 --- /dev/null +++ b/java/src/main/native/src/check_nvcomp_output_sizes.hpp @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace cudf { +namespace java { + +/** + * Check that the vector of expected uncompressed sizes matches the vector of actual compressed + * sizes. Both vectors are assumed to be in device memory and contain num_chunks elements. + */ +bool check_nvcomp_output_sizes(std::size_t const *dev_uncompressed_sizes, + std::size_t const *dev_actual_uncompressed_sizes, + std::size_t num_chunks, rmm::cuda_stream_view stream); +} // namespace java +} // namespace cudf diff --git a/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java b/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java index a41cc22e9b2..c36d241500a 100644 --- a/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java +++ b/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,18 +29,20 @@ public class NvcompTest { private static final Logger log = LoggerFactory.getLogger(ColumnVector.class); @Test - void testLZ4RoundTripSync() { + void testLZ4RoundTripViaLZ4DecompressorSync() { lz4RoundTrip(false); } @Test - void testLZ4RoundTripAsync() { + void testLZ4RoundTripViaLZ4DecompressorAsync() { lz4RoundTrip(true); } @Test void testBatchedLZ4RoundTripAsync() { + final Cuda.Stream stream = Cuda.DEFAULT_STREAM; final long chunkSize = 64 * 1024; + final long targetIntermediteSize = Long.MAX_VALUE; final int maxElements = 1024 * 1024 + 1; final int numBuffers = 200; long[] data = new long[maxElements]; @@ -48,149 +50,52 @@ void testBatchedLZ4RoundTripAsync() { data[i] = i; } - DeviceMemoryBuffer[] originalBuffers = new DeviceMemoryBuffer[numBuffers]; - DeviceMemoryBuffer[] uncompressedBuffers = new DeviceMemoryBuffer[numBuffers]; - - // compressed data in buffers that are likely oversized - DeviceMemoryBuffer[] compressedBuffers = new DeviceMemoryBuffer[numBuffers]; - - // compressed data in right-sized buffers - DeviceMemoryBuffer[] compressedInputs = new DeviceMemoryBuffer[numBuffers]; - - try { + try (CloseableArray originalBuffers = + CloseableArray.wrap(new DeviceMemoryBuffer[numBuffers])) { // create the batched buffers to compress - for (int i = 0; i < numBuffers; ++i) { - originalBuffers[i] = initBatchBuffer(data, i); + for (int i = 0; i < originalBuffers.size(); i++) { + originalBuffers.set(i, initBatchBuffer(data, i)); + // Increment the refcount since compression will try to close it + originalBuffers.get(i).incRefCount(); } - // compress the buffers - long[] outputSizes; - long[] compressedSizes; - long tempSize = BatchedLZ4Compressor.getTempSize(originalBuffers, chunkSize); - try (DeviceMemoryBuffer tempBuffer = DeviceMemoryBuffer.allocate(tempSize)) { - outputSizes = BatchedLZ4Compressor.getOutputSizes(originalBuffers, chunkSize, tempBuffer); - for (int i = 0; i < numBuffers; ++i) { - compressedBuffers[i] = DeviceMemoryBuffer.allocate(outputSizes[i]); - } - long sizesBufferSize = BatchedLZ4Compressor.getCompressedSizesBufferSize(numBuffers); - try (HostMemoryBuffer compressedSizesBuffer = HostMemoryBuffer.allocate(sizesBufferSize)) { - BatchedLZ4Compressor.compressAsync(compressedSizesBuffer, originalBuffers, chunkSize, - tempBuffer, compressedBuffers, Cuda.DEFAULT_STREAM); - Cuda.DEFAULT_STREAM.sync(); - compressedSizes = new long[numBuffers]; - for (int i = 0; i < numBuffers; ++i) { - compressedSizes[i] = compressedSizesBuffer.getLong(i * 8); - } - } - } - - // right-size the compressed buffers based on reported compressed sizes - for (int i = 0; i < numBuffers; ++i) { - compressedInputs[i] = compressedBuffers[i].slice(0, compressedSizes[i]); - } - - // decompress the buffers - try (BatchedLZ4Decompressor.BatchedMetadata metadata = - BatchedLZ4Decompressor.getMetadata(compressedInputs, Cuda.DEFAULT_STREAM)) { - outputSizes = BatchedLZ4Decompressor.getOutputSizes(metadata, numBuffers); - for (int i = 0; i < numBuffers; ++i) { - uncompressedBuffers[i] = DeviceMemoryBuffer.allocate(outputSizes[i]); - } - tempSize = BatchedLZ4Decompressor.getTempSize(metadata); - try (DeviceMemoryBuffer tempBuffer = DeviceMemoryBuffer.allocate(tempSize)) { - BatchedLZ4Decompressor.decompressAsync(compressedInputs, tempBuffer, metadata, - uncompressedBuffers, Cuda.DEFAULT_STREAM); - } - } + // compress and decompress the buffers + BatchedLZ4Compressor compressor = new BatchedLZ4Compressor(chunkSize, targetIntermediteSize); - // check the decompressed results against the original - for (int i = 0; i < numBuffers; ++i) { - try (HostMemoryBuffer expected = HostMemoryBuffer.allocate(originalBuffers[i].getLength()); - HostMemoryBuffer actual = HostMemoryBuffer.allocate(outputSizes[i])) { - Assertions.assertTrue(expected.getLength() <= Integer.MAX_VALUE); - Assertions.assertTrue(actual.getLength() <= Integer.MAX_VALUE); - Assertions.assertEquals(originalBuffers[i].getLength(), uncompressedBuffers[i].getLength(), - "uncompressed size mismatch at buffer " + i); - expected.copyFromDeviceBuffer(originalBuffers[i]); - actual.copyFromDeviceBuffer(uncompressedBuffers[i]); - byte[] expectedBytes = new byte[(int) expected.getLength()]; - expected.getBytes(expectedBytes, 0, 0, expected.getLength()); - byte[] actualBytes = new byte[(int) actual.getLength()]; - actual.getBytes(actualBytes, 0, 0, actual.getLength()); - Assertions.assertArrayEquals(expectedBytes, actualBytes, - "mismatch in batch buffer " + i); + try (CloseableArray compressedBuffers = + CloseableArray.wrap(compressor.compress(originalBuffers.getArray(), stream)); + CloseableArray uncompressedBuffers = + CloseableArray.wrap(new DeviceMemoryBuffer[numBuffers])) { + for (int i = 0; i < numBuffers; i++) { + uncompressedBuffers.set(i, + DeviceMemoryBuffer.allocate(originalBuffers.get(i).getLength())); } - } - } finally { - closeBufferArray(originalBuffers); - closeBufferArray(uncompressedBuffers); - closeBufferArray(compressedBuffers); - closeBufferArray(compressedInputs); - } - } - - @Test - void testBatchedLZ4CompressRoundTrip() { - final long chunkSize = 64 * 1024; - final int maxElements = 1024 * 1024 + 1; - final int numBuffers = 200; - long[] data = new long[maxElements]; - for (int i = 0; i < maxElements; ++i) { - data[i] = i; - } - - DeviceMemoryBuffer[] originalBuffers = new DeviceMemoryBuffer[numBuffers]; - DeviceMemoryBuffer[] uncompressedBuffers = new DeviceMemoryBuffer[numBuffers]; - BatchedLZ4Compressor.BatchedCompressionResult compResult = null; - - // compressed data in right-sized buffers - DeviceMemoryBuffer[] compressedInputs = new DeviceMemoryBuffer[numBuffers]; - - try { - // create the batched buffers to compress - for (int i = 0; i < numBuffers; ++i) { - originalBuffers[i] = initBatchBuffer(data, i); - } - // compress the buffers - compResult = BatchedLZ4Compressor.compress(originalBuffers, chunkSize, Cuda.DEFAULT_STREAM); + // decompress takes ownership of the compressed buffers and will close them + BatchedLZ4Decompressor.decompressAsync(chunkSize, compressedBuffers.release(), + uncompressedBuffers.getArray(), stream); - // right-size the compressed buffers based on reported compressed sizes - DeviceMemoryBuffer[] compressedBuffers = compResult.getCompressedBuffers(); - long[] compressedSizes = compResult.getCompressedSizes(); - for (int i = 0; i < numBuffers; ++i) { - compressedInputs[i] = compressedBuffers[i].slice(0, compressedSizes[i]); - } - - // decompress the buffers - uncompressedBuffers = BatchedLZ4Decompressor.decompressAsync(compressedInputs, - Cuda.DEFAULT_STREAM); - - // check the decompressed results against the original - for (int i = 0; i < numBuffers; ++i) { - try (HostMemoryBuffer expected = HostMemoryBuffer.allocate(originalBuffers[i].getLength()); - HostMemoryBuffer actual = HostMemoryBuffer.allocate(uncompressedBuffers[i].getLength())) { - Assertions.assertTrue(expected.getLength() <= Integer.MAX_VALUE); - Assertions.assertTrue(actual.getLength() <= Integer.MAX_VALUE); - Assertions.assertEquals(originalBuffers[i].getLength(), uncompressedBuffers[i].getLength(), - "uncompressed size mismatch at buffer " + i); - expected.copyFromDeviceBuffer(originalBuffers[i]); - actual.copyFromDeviceBuffer(uncompressedBuffers[i]); - byte[] expectedBytes = new byte[(int) expected.getLength()]; - expected.getBytes(expectedBytes, 0, 0, expected.getLength()); - byte[] actualBytes = new byte[(int) actual.getLength()]; - actual.getBytes(actualBytes, 0, 0, actual.getLength()); - Assertions.assertArrayEquals(expectedBytes, actualBytes, - "mismatch in batch buffer " + i); + // check the decompressed results against the original + for (int i = 0; i < numBuffers; ++i) { + try (HostMemoryBuffer expected = + HostMemoryBuffer.allocate(originalBuffers.get(i).getLength()); + HostMemoryBuffer actual = + HostMemoryBuffer.allocate(uncompressedBuffers.get(i).getLength())) { + Assertions.assertTrue(expected.getLength() <= Integer.MAX_VALUE); + Assertions.assertTrue(actual.getLength() <= Integer.MAX_VALUE); + Assertions.assertEquals(expected.getLength(), actual.getLength(), + "uncompressed size mismatch at buffer " + i); + expected.copyFromDeviceBuffer(originalBuffers.get(i)); + actual.copyFromDeviceBuffer(uncompressedBuffers.get(i)); + byte[] expectedBytes = new byte[(int) expected.getLength()]; + expected.getBytes(expectedBytes, 0, 0, expected.getLength()); + byte[] actualBytes = new byte[(int) actual.getLength()]; + actual.getBytes(actualBytes, 0, 0, actual.getLength()); + Assertions.assertArrayEquals(expectedBytes, actualBytes, + "mismatch in batch buffer " + i); + } } } - } finally { - closeBufferArray(originalBuffers); - closeBufferArray(uncompressedBuffers); - closeBufferArray(compressedInputs); - if (compResult != null) { - closeBufferArray(compResult.getCompressedBuffers()); - } } } @@ -200,14 +105,6 @@ private void closeBuffer(MemoryBuffer buffer) { } } - private void closeBufferArray(MemoryBuffer[] buffers) { - for (MemoryBuffer buffer : buffers) { - if (buffer != null) { - buffer.close(); - } - } - } - private DeviceMemoryBuffer initBatchBuffer(long[] data, int bufferId) { // grab a subsection of the data based on buffer ID int dataStart = 0; @@ -239,6 +136,7 @@ private DeviceMemoryBuffer initBatchBuffer(long[] data, int bufferId) { } private void lz4RoundTrip(boolean useAsync) { + final Cuda.Stream stream = Cuda.DEFAULT_STREAM; final long chunkSize = 64 * 1024; final int numElements = 10 * 1024 * 1024 + 1; long[] data = new long[numElements]; @@ -251,31 +149,32 @@ private void lz4RoundTrip(boolean useAsync) { DeviceMemoryBuffer uncompressedBuffer = null; try (ColumnVector v = ColumnVector.fromLongs(data)) { BaseDeviceMemoryBuffer inputBuffer = v.getDeviceBufferFor(BufferType.DATA); - log.debug("Uncompressed size is {}", inputBuffer.getLength()); - - long tempSize = LZ4Compressor.getTempSize(inputBuffer, CompressionType.CHAR, chunkSize); - - log.debug("Using {} temporary space for lz4 compression", tempSize); - tempBuffer = DeviceMemoryBuffer.allocate(tempSize); + final long uncompressedSize = inputBuffer.getLength(); + log.debug("Uncompressed size is {}", uncompressedSize); - long outSize = LZ4Compressor.getOutputSize(inputBuffer, CompressionType.CHAR, chunkSize, - tempBuffer); - log.debug("lz4 compressed size estimate is {}", outSize); + LZ4Compressor.Configuration compressConf = + LZ4Compressor.configure(chunkSize, uncompressedSize); + Assertions.assertTrue(compressConf.getMetadataBytes() > 0); + log.debug("Using {} temporary space for lz4 compression", compressConf.getTempBytes()); + tempBuffer = DeviceMemoryBuffer.allocate(compressConf.getTempBytes()); + log.debug("lz4 compressed size estimate is {}", compressConf.getMaxCompressedBytes()); - compressedBuffer = DeviceMemoryBuffer.allocate(outSize); + compressedBuffer = DeviceMemoryBuffer.allocate(compressConf.getMaxCompressedBytes()); long startTime = System.nanoTime(); long compressedSize; if (useAsync) { - try (HostMemoryBuffer tempHostBuffer = HostMemoryBuffer.allocate(8)) { - LZ4Compressor.compressAsync(tempHostBuffer, inputBuffer, CompressionType.CHAR, chunkSize, - tempBuffer, compressedBuffer, Cuda.DEFAULT_STREAM); - Cuda.DEFAULT_STREAM.sync(); - compressedSize = tempHostBuffer.getLong(0); + try (DeviceMemoryBuffer devCompressedSizeBuffer = DeviceMemoryBuffer.allocate(8); + HostMemoryBuffer hostCompressedSizeBuffer = HostMemoryBuffer.allocate(8)) { + LZ4Compressor.compressAsync(devCompressedSizeBuffer, inputBuffer, CompressionType.CHAR, + chunkSize, tempBuffer, compressedBuffer, stream); + hostCompressedSizeBuffer.copyFromDeviceBufferAsync(devCompressedSizeBuffer, stream); + stream.sync(); + compressedSize = hostCompressedSizeBuffer.getLong(0); } } else { compressedSize = LZ4Compressor.compress(inputBuffer, CompressionType.CHAR, chunkSize, - tempBuffer, compressedBuffer, Cuda.DEFAULT_STREAM); + tempBuffer, compressedBuffer, stream); } double duration = (System.nanoTime() - startTime) / 1000.0; log.info("Compressed with lz4 to {} in {} us", compressedSize, duration); @@ -283,23 +182,20 @@ private void lz4RoundTrip(boolean useAsync) { tempBuffer.close(); tempBuffer = null; - Assertions.assertTrue(Decompressor.isLZ4Data(compressedBuffer)); - - try (Decompressor.Metadata metadata = - Decompressor.getMetadata(compressedBuffer, Cuda.DEFAULT_STREAM)) { - Assertions.assertTrue(metadata.isLZ4Metadata()); - tempSize = Decompressor.getTempSize(metadata); + try (LZ4Decompressor.Configuration decompressConf = + LZ4Decompressor.configure(compressedBuffer, stream)) { + final long tempSize = decompressConf.getTempBytes(); log.debug("Using {} temporary space for lz4 compression", tempSize); tempBuffer = DeviceMemoryBuffer.allocate(tempSize); - outSize = Decompressor.getOutputSize(metadata); + final long outSize = decompressConf.getUncompressedBytes(); Assertions.assertEquals(inputBuffer.getLength(), outSize); uncompressedBuffer = DeviceMemoryBuffer.allocate(outSize); - Decompressor.decompressAsync(compressedBuffer, tempBuffer, metadata, uncompressedBuffer, - Cuda.DEFAULT_STREAM); + LZ4Decompressor.decompressAsync(compressedBuffer, decompressConf, tempBuffer, + uncompressedBuffer, stream); try (ColumnVector v2 = new ColumnVector( DType.INT64, @@ -324,133 +220,4 @@ private void lz4RoundTrip(boolean useAsync) { closeBuffer(uncompressedBuffer); } } - - @Test - void testCascadedRoundTripSync() { - cascadedRoundTrip(false); - } - - @Test - void testCascadedRoundTripAsync() { - cascadedRoundTrip(true); - } - - private void cascadedRoundTrip(boolean useAsync) { - final int numElements = 10 * 1024 * 1024 + 1; - final int numRunLengthEncodings = 2; - final int numDeltas = 1; - final boolean useBitPacking = true; - int[] data = new int[numElements]; - for (int i = 0; i < numElements; ++i) { - data[i] = i; - } - - DeviceMemoryBuffer tempBuffer = null; - DeviceMemoryBuffer compressedBuffer = null; - DeviceMemoryBuffer uncompressedBuffer = null; - try (ColumnVector v = ColumnVector.fromInts(data)) { - BaseDeviceMemoryBuffer inputBuffer = v.getDeviceBufferFor(BufferType.DATA); - log.debug("Uncompressed size is " + inputBuffer.getLength()); - - long tempSize = NvcompJni.cascadedCompressGetTempSize( - inputBuffer.getAddress(), - inputBuffer.getLength(), - CompressionType.INT.nativeId, - numRunLengthEncodings, - numDeltas, - useBitPacking); - - log.debug("Using {} temporary space for cascaded compression", tempSize); - tempBuffer = DeviceMemoryBuffer.allocate(tempSize); - - long outSize = NvcompJni.cascadedCompressGetOutputSize( - inputBuffer.getAddress(), - inputBuffer.getLength(), - CompressionType.INT.nativeId, - numRunLengthEncodings, - numDeltas, - useBitPacking, - tempBuffer.getAddress(), - tempBuffer.getLength(), - false); - log.debug("Inexact cascaded compressed size estimate is {}", outSize); - - compressedBuffer = DeviceMemoryBuffer.allocate(outSize); - - long startTime = System.nanoTime(); - long compressedSize; - if (useAsync) { - try (HostMemoryBuffer tempHostBuffer = HostMemoryBuffer.allocate(8)) { - NvcompJni.cascadedCompressAsync( - tempHostBuffer.getAddress(), - inputBuffer.getAddress(), - inputBuffer.getLength(), - CompressionType.INT.nativeId, - numRunLengthEncodings, - numDeltas, - useBitPacking, - tempBuffer.getAddress(), - tempBuffer.getLength(), - compressedBuffer.getAddress(), - compressedBuffer.getLength(), - 0); - Cuda.DEFAULT_STREAM.sync(); - compressedSize = tempHostBuffer.getLong(0); - } - } else { - compressedSize = NvcompJni.cascadedCompress( - inputBuffer.getAddress(), - inputBuffer.getLength(), - CompressionType.INT.nativeId, - numRunLengthEncodings, - numDeltas, - useBitPacking, - tempBuffer.getAddress(), - tempBuffer.getLength(), - compressedBuffer.getAddress(), - compressedBuffer.getLength(), - 0); - } - - double duration = (System.nanoTime() - startTime) / 1000.0; - log.debug("Compressed with cascaded to {} in {} us", compressedSize, duration); - - tempBuffer.close(); - tempBuffer = null; - - try (Decompressor.Metadata metadata = - Decompressor.getMetadata(compressedBuffer, Cuda.DEFAULT_STREAM)) { - tempSize = Decompressor.getTempSize(metadata); - - log.debug("Using {} temporary space for cascaded compression", tempSize); - tempBuffer = DeviceMemoryBuffer.allocate(tempSize); - - outSize = Decompressor.getOutputSize(metadata); - Assertions.assertEquals(inputBuffer.getLength(), outSize); - - uncompressedBuffer = DeviceMemoryBuffer.allocate(outSize); - - Decompressor.decompressAsync(compressedBuffer, tempBuffer, metadata, uncompressedBuffer, - Cuda.DEFAULT_STREAM); - - try (ColumnVector v2 = new ColumnVector( - DType.INT32, - numElements, - Optional.empty(), - uncompressedBuffer, - null, - null)) { - uncompressedBuffer = null; - try (ColumnVector compare = v2.equalTo(v); - Scalar compareAll = compare.all()) { - Assertions.assertTrue(compareAll.getBoolean()); - } - } - } - } finally { - closeBuffer(tempBuffer); - closeBuffer(compressedBuffer); - closeBuffer(uncompressedBuffer); - } - } }