From 90cc39c2fe370c8dd1e320bef8fb2de2865dbefe Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 18 Aug 2023 22:27:10 -0700 Subject: [PATCH 1/6] Add HostMemoryAllocator interface Fixes NVIDIA/spark-rapids#8884 Signed-off-by: Gera Shegalov --- .../main/java/ai/rapids/cudf/ColumnView.java | 20 +++++---- .../cudf/DefaultHostMemoryAllocator.java | 36 ++++++++++++++++ .../ai/rapids/cudf/HostMemoryAllocator.java | 39 +++++++++++++++++ .../ai/rapids/cudf/JCudfSerialization.java | 6 ++- .../java/ai/rapids/cudf/PinnedMemoryPool.java | 3 +- java/src/main/java/ai/rapids/cudf/Table.java | 14 ++++--- .../cudf/nvcomp/BatchedLZ4Compressor.java | 17 +++----- .../java/ai/rapids/cudf/ColumnVectorTest.java | 3 +- .../cudf/ColumnViewNonEmptyNullsTest.java | 4 +- .../test/java/ai/rapids/cudf/CuFileTest.java | 15 ++++--- .../java/ai/rapids/cudf/GatherMapTest.java | 8 ++-- .../java/ai/rapids/cudf/MemoryBufferTest.java | 42 ++++++++++--------- .../test/java/ai/rapids/cudf/TableTest.java | 16 +++---- .../ai/rapids/cudf/nvcomp/NvcompTest.java | 8 ++-- 14 files changed, 160 insertions(+), 71 deletions(-) create mode 100644 java/src/main/java/ai/rapids/cudf/DefaultHostMemoryAllocator.java create mode 100644 java/src/main/java/ai/rapids/cudf/HostMemoryAllocator.java diff --git a/java/src/main/java/ai/rapids/cudf/ColumnView.java b/java/src/main/java/ai/rapids/cudf/ColumnView.java index 0a7346d1cbc..f586042f52c 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnView.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnView.java @@ -42,6 +42,8 @@ public class ColumnView implements AutoCloseable, BinaryOperable { protected final long nullCount; protected final ColumnVector.OffHeapState offHeap; + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + /** * Constructs a Column View given a native view address. This asserts that if the ColumnView is * of nested-type it doesn't contain non-empty nulls @@ -5023,15 +5025,15 @@ private static HostColumnVectorCore copyToHostNestedHelper( currOffsets = deviceCvPointer.getOffsets(); currValidity = deviceCvPointer.getValid(); if (currData != null) { - hostData = HostMemoryBuffer.allocate(currData.length); + hostData = hostMemoryAllocator.allocate(currData.length); hostData.copyFromDeviceBuffer(currData); } if (currValidity != null) { - hostValid = HostMemoryBuffer.allocate(currValidity.length); + hostValid = hostMemoryAllocator.allocate(currValidity.length); hostValid.copyFromDeviceBuffer(currValidity); } if (currOffsets != null) { - hostOffsets = HostMemoryBuffer.allocate(currOffsets.length); + hostOffsets = hostMemoryAllocator.allocate(currOffsets.length); hostOffsets.copyFromDeviceBuffer(currOffsets); } int numChildren = deviceCvPointer.getNumChildren(); @@ -5094,16 +5096,16 @@ public HostColumnVector copyToHost() { getNullCount(); if (!type.isNestedType()) { if (valid != null) { - hostValidityBuffer = HostMemoryBuffer.allocate(valid.getLength()); + hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength()); hostValidityBuffer.copyFromDeviceBuffer(valid); } if (offsets != null) { - hostOffsetsBuffer = HostMemoryBuffer.allocate(offsets.length); + hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.length); hostOffsetsBuffer.copyFromDeviceBuffer(offsets); } // If a strings column is all null values there is no data buffer allocated if (data != null) { - hostDataBuffer = HostMemoryBuffer.allocate(data.length); + hostDataBuffer = hostMemoryAllocator.allocate(data.length); hostDataBuffer.copyFromDeviceBuffer(data); } HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount), @@ -5112,16 +5114,16 @@ public HostColumnVector copyToHost() { return ret; } else { if (data != null) { - hostDataBuffer = HostMemoryBuffer.allocate(data.length); + hostDataBuffer = hostMemoryAllocator.allocate(data.length); hostDataBuffer.copyFromDeviceBuffer(data); } if (valid != null) { - hostValidityBuffer = HostMemoryBuffer.allocate(valid.getLength()); + hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength()); hostValidityBuffer.copyFromDeviceBuffer(valid); } if (offsets != null) { - hostOffsetsBuffer = HostMemoryBuffer.allocate(offsets.getLength()); + hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.getLength()); hostOffsetsBuffer.copyFromDeviceBuffer(offsets); } List children = new ArrayList<>(); diff --git a/java/src/main/java/ai/rapids/cudf/DefaultHostMemoryAllocator.java b/java/src/main/java/ai/rapids/cudf/DefaultHostMemoryAllocator.java new file mode 100644 index 00000000000..98a5b00cf85 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/DefaultHostMemoryAllocator.java @@ -0,0 +1,36 @@ +/* + * + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ai.rapids.cudf; + +public class DefaultHostMemoryAllocator implements HostMemoryAllocator { + private static final HostMemoryAllocator INSTANCE = new DefaultHostMemoryAllocator(); + public static HostMemoryAllocator get() { + return INSTANCE; + } + + @Override + public HostMemoryBuffer allocate(long bytes, boolean preferPinned) { + return HostMemoryBuffer.allocate(bytes, preferPinned); + } + + @Override + public HostMemoryBuffer allocate(long bytes) { + return HostMemoryBuffer.allocate(bytes); + } +} diff --git a/java/src/main/java/ai/rapids/cudf/HostMemoryAllocator.java b/java/src/main/java/ai/rapids/cudf/HostMemoryAllocator.java new file mode 100644 index 00000000000..9834eb85e18 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/HostMemoryAllocator.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf; + +public interface HostMemoryAllocator { + + /** + * Allocate memory, but be sure to close the returned buffer to avoid memory leaks. + * @param bytes size in bytes to allocate + * @param preferPinned If set to true, the pinned memory pool will be used if possible with a + * fallback to off-heap memory. If set to false, the allocation will always + * be from off-heap memory. + * @return the newly created buffer + */ + HostMemoryBuffer allocate(long bytes, boolean preferPinned); + + /** + * Allocate memory, but be sure to close the returned buffer to avoid memory leaks. Pinned memory + * for allocations preference is up to the implementor + * + * @param bytes size in bytes to allocate + * @return the newly created buffer + */ + HostMemoryBuffer allocate(long bytes); +} diff --git a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java index 40a22604f49..50c3bec9cf8 100644 --- a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java +++ b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java @@ -61,6 +61,8 @@ public class JCudfSerialization { private static final int SER_FORMAT_MAGIC_NUMBER = 0x43554446; private static final short VERSION_NUMBER = 0x0000; + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + private static final class ColumnOffsets { private final long validity; private final long offsets; @@ -1817,7 +1819,7 @@ public static HostConcatResult concatToHostBuffer(SerializedTableHeader[] header ColumnBufferProvider[][] providersPerColumn = providersFrom(headers, dataBuffers); try { SerializedTableHeader combined = calcConcatHeader(providersPerColumn); - HostMemoryBuffer hostBuffer = HostMemoryBuffer.allocate(combined.dataLen); + HostMemoryBuffer hostBuffer = hostMemoryAllocator.allocate(combined.dataLen); try { try (NvtxRange range = new NvtxRange("Concat Host Side", NvtxColor.GREEN)) { DataWriter writer = writerFrom(hostBuffer); @@ -1934,7 +1936,7 @@ public static TableAndRowCountPair readTableFrom(InputStream in) throws IOExcept return new TableAndRowCountPair(0, null); } - try (HostMemoryBuffer hostBuffer = HostMemoryBuffer.allocate(header.dataLen)) { + try (HostMemoryBuffer hostBuffer = hostMemoryAllocator.allocate(header.dataLen)) { if (header.dataLen > 0) { readTableIntoBuffer(din, header, hostBuffer); } diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 6eee935748e..85bd1831a9e 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -234,9 +234,10 @@ public static HostMemoryBuffer tryAllocate(long bytes) { * @return newly created buffer */ public static HostMemoryBuffer allocate(long bytes) { + final HostMemoryAllocator allocator = DefaultHostMemoryAllocator.get(); HostMemoryBuffer result = tryAllocate(bytes); if (result == null) { - result = HostMemoryBuffer.allocate(bytes, false); + result = allocator.allocate(bytes, false); } return result; } diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 4eb28f48337..38d060b5f98 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -47,6 +47,8 @@ public final class Table implements AutoCloseable { private long nativeHandle; private ColumnVector[] columns; + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + /** * Table class makes a copy of the array of {@link ColumnVector}s passed to it. The class * will decrease the refcount @@ -831,7 +833,7 @@ public static Table readCSV(Schema schema, CSVOptions opts, byte[] buffer, long assert len > 0; assert len <= buffer.length - offset; assert offset >= 0 && offset < buffer.length; - try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) { + try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) { newBuf.setBytes(0, buffer, offset, len); return readCSV(schema, opts, newBuf, 0, len); } @@ -1048,7 +1050,7 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, lon assert len > 0; assert len <= buffer.length - offset; assert offset >= 0 && offset < buffer.length; - try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) { + try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) { newBuf.setBytes(0, buffer, offset, len); return readJSON(schema, opts, newBuf, 0, len); } @@ -1152,7 +1154,7 @@ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, assert len > 0; assert len <= buffer.length - offset; assert offset >= 0 && offset < buffer.length; - try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) { + try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) { newBuf.setBytes(0, buffer, offset, len); return readParquet(opts, newBuf, 0, len); } @@ -1230,7 +1232,7 @@ public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long assert len <= buffer.length - offset; len = len > 0 ? len : buffer.length - offset; - try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) { + try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) { newBuf.setBytes(0, buffer, offset, len); return readAvro(opts, newBuf, 0, len); } @@ -1310,7 +1312,7 @@ public static Table readORC(ORCOptions opts, byte[] buffer, long offset, long le assert len > 0; assert len <= buffer.length - offset; assert offset >= 0 && offset < buffer.length; - try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) { + try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) { newBuf.setBytes(0, buffer, offset, len); return readORC(opts, newBuf, 0, len); } @@ -1609,7 +1611,7 @@ private static class ArrowReaderWrapper implements AutoCloseable { private ArrowReaderWrapper(HostBufferProvider provider) { this.provider = provider; - buffer = HostMemoryBuffer.allocate(10 * 1024 * 1024, false); + buffer = hostMemoryAllocator.allocate(10 * 1024 * 1024, false); } // Called From JNI diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java index 1ab3b97945d..7d7ab732c26 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java @@ -16,19 +16,12 @@ package ai.rapids.cudf.nvcomp; -import ai.rapids.cudf.BaseDeviceMemoryBuffer; -import ai.rapids.cudf.CloseableArray; -import ai.rapids.cudf.Cuda; -import ai.rapids.cudf.DeviceMemoryBuffer; -import ai.rapids.cudf.HostMemoryBuffer; -import ai.rapids.cudf.MemoryBuffer; -import ai.rapids.cudf.NvtxColor; -import ai.rapids.cudf.NvtxRange; - -import java.util.Arrays; +import ai.rapids.cudf.*; /** Multi-buffer LZ4 compressor */ public class BatchedLZ4Compressor { + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + static final long MAX_CHUNK_SIZE = 16777216; // in bytes // each chunk has a 64-bit integer value as metadata containing the compressed size static final long METADATA_BYTES_PER_CHUNK = 8; @@ -207,7 +200,7 @@ private DeviceMemoryBuffer putAddrsAndSizesOnDevice(long[] inputAddrs, final long outputAddrsOffset = inputAddrs.length * 8L; final long sizesOffset = outputAddrsOffset + inputAddrs.length * 8L; try (NvtxRange range = new NvtxRange("putAddrsAndSizesOnDevice", NvtxColor.YELLOW)) { - try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(totalSize); + try (HostMemoryBuffer hostbuf = hostMemoryAllocator.allocate(totalSize); DeviceMemoryBuffer result = DeviceMemoryBuffer.allocate(totalSize)) { hostbuf.setLongs(0, inputAddrs, 0, inputAddrs.length); hostbuf.setLongs(outputAddrsOffset, outputAddrs, 0, outputAddrs.length); @@ -224,7 +217,7 @@ private DeviceMemoryBuffer putAddrsAndSizesOnDevice(long[] inputAddrs, // Synchronously copy the resulting compressed sizes from device memory to host memory. private long[] getOutputChunkSizes(BaseDeviceMemoryBuffer devChunkSizes, Cuda.Stream stream) { try (NvtxRange range = new NvtxRange("getOutputChunkSizes", NvtxColor.YELLOW)) { - try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(devChunkSizes.getLength())) { + try (HostMemoryBuffer hostbuf = hostMemoryAllocator.allocate(devChunkSizes.getLength())) { hostbuf.copyFromDeviceBuffer(devChunkSizes, stream); int numChunks = (int) (devChunkSizes.getLength() / 8); long[] result = new long[numChunks]; diff --git a/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java b/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java index 0e1fbad6129..11b9b2d0ccf 100644 --- a/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java +++ b/java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java @@ -6737,10 +6737,11 @@ void testColumnViewWithNonEmptyNullsIsCleared() { List list1 = Arrays.asList(4, 5, null); List list2 = Arrays.asList(7, 8, 9); List list3 = null; + final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); try (ColumnVector input = ColumnVectorTest.makeListsColumn(DType.INT32, list0, list1, list2, list3); BaseDeviceMemoryBuffer baseValidityBuffer = input.getDeviceBufferFor(BufferType.VALIDITY); BaseDeviceMemoryBuffer baseOffsetBuffer = input.getDeviceBufferFor(BufferType.OFFSET); - HostMemoryBuffer newValidity = HostMemoryBuffer.allocate(BitVectorHelper.getValidityAllocationSizeInBytes(4))) { + HostMemoryBuffer newValidity = hostMemoryAllocator.allocate(BitVectorHelper.getValidityAllocationSizeInBytes(4))) { newValidity.copyFromDeviceBuffer(baseValidityBuffer); // we are setting list1 with 3 elements to null. This will result in a non-empty null in the diff --git a/java/src/test/java/ai/rapids/cudf/ColumnViewNonEmptyNullsTest.java b/java/src/test/java/ai/rapids/cudf/ColumnViewNonEmptyNullsTest.java index 45e14ef8e26..8d5351d95f4 100644 --- a/java/src/test/java/ai/rapids/cudf/ColumnViewNonEmptyNullsTest.java +++ b/java/src/test/java/ai/rapids/cudf/ColumnViewNonEmptyNullsTest.java @@ -34,6 +34,8 @@ */ public class ColumnViewNonEmptyNullsTest extends CudfTestBase { + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + @Test void testAndNullReconfigureNulls() { try (ColumnVector v0 = ColumnVector.fromBoxedInts(0, 100, null, null, Integer.MIN_VALUE, null); @@ -84,7 +86,7 @@ private ColumnView[] getColumnViewWithNonEmptyNulls() { ColumnVector input = ColumnVectorTest.makeListsColumn(DType.INT32, list0, list1, list2, list3); // Modify the validity buffer BaseDeviceMemoryBuffer dmb = input.getDeviceBufferFor(BufferType.VALIDITY); - try (HostMemoryBuffer newValidity = HostMemoryBuffer.allocate(64)) { + try (HostMemoryBuffer newValidity = hostMemoryAllocator.allocate(64)) { newValidity.copyFromDeviceBuffer(dmb); BitVectorHelper.setNullAt(newValidity, 1); dmb.copyFromHostBuffer(newValidity); diff --git a/java/src/test/java/ai/rapids/cudf/CuFileTest.java b/java/src/test/java/ai/rapids/cudf/CuFileTest.java index 10415cae893..c819b617ced 100644 --- a/java/src/test/java/ai/rapids/cudf/CuFileTest.java +++ b/java/src/test/java/ai/rapids/cudf/CuFileTest.java @@ -27,6 +27,9 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue; public class CuFileTest extends CudfTestBase { + + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + @AfterEach void tearDown() { if (PinnedMemoryPool.isInitialized()) { @@ -67,10 +70,10 @@ public void testAppendToExistingFile(@TempDir File tempDir) throws IOException { } private void verifyCopyToFile(File tempFile) { - try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer orig = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer dest = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer dest = hostMemoryAllocator.allocate(16)) { orig.setLong(0, 123456789); from.copyFromHostBuffer(orig); CuFile.writeDeviceBufferToFile(tempFile, 0, from); @@ -81,10 +84,10 @@ private void verifyCopyToFile(File tempFile) { } private void verifyAppendToFile(File tempFile) { - try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer orig = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer dest = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer dest = hostMemoryAllocator.allocate(16)) { orig.setLong(0, 123456789); from.copyFromHostBuffer(orig); assertEquals(0, CuFile.appendDeviceBufferToFile(tempFile, from)); @@ -128,7 +131,7 @@ public void testReadWriteRegisteredBuffer(@TempDir File tempDir) { } private void verifyReadWrite(File tempFile, int length, boolean registerBuffer) { - try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(length); + try (HostMemoryBuffer orig = hostMemoryAllocator.allocate(length); CuFileBuffer from = CuFileBuffer.allocate(length, registerBuffer); CuFileWriteHandle writer = new CuFileWriteHandle(tempFile.getAbsolutePath())) { orig.setLong(0, 123456789); @@ -141,7 +144,7 @@ private void verifyReadWrite(File tempFile, int length, boolean registerBuffer) } try (CuFileBuffer to = CuFileBuffer.allocate(length, registerBuffer); CuFileReadHandle reader = new CuFileReadHandle(tempFile.getAbsolutePath()); - HostMemoryBuffer dest = HostMemoryBuffer.allocate(length)) { + HostMemoryBuffer dest = hostMemoryAllocator.allocate(length)) { reader.read(to, 0); dest.copyFromDeviceBuffer(to); assertEquals(123456789, dest.getLong(0)); diff --git a/java/src/test/java/ai/rapids/cudf/GatherMapTest.java b/java/src/test/java/ai/rapids/cudf/GatherMapTest.java index b0e78a2c2cd..edf469f4b03 100644 --- a/java/src/test/java/ai/rapids/cudf/GatherMapTest.java +++ b/java/src/test/java/ai/rapids/cudf/GatherMapTest.java @@ -24,6 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class GatherMapTest { + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + @Test void testInvalidBuffer() { try (DeviceMemoryBuffer buffer = DeviceMemoryBuffer.allocate(707)) { @@ -68,7 +70,7 @@ void testInvalidColumnView() { @Test void testToColumnView() { - try (HostMemoryBuffer hostBuffer = HostMemoryBuffer.allocate(8 * 4)) { + try (HostMemoryBuffer hostBuffer = hostMemoryAllocator.allocate(8 * 4)) { hostBuffer.setInts(0, new int[]{10, 11, 12, 13, 14, 15, 16, 17}, 0, 8); try (DeviceMemoryBuffer devBuffer = DeviceMemoryBuffer.allocate(8*4)) { devBuffer.copyFromHostBuffer(hostBuffer); @@ -78,7 +80,7 @@ void testToColumnView() { assertEquals(DType.INT32, view.getType()); assertEquals(0, view.getNullCount()); assertEquals(8, view.getRowCount()); - try (HostMemoryBuffer viewHostBuffer = HostMemoryBuffer.allocate(8 * 4)) { + try (HostMemoryBuffer viewHostBuffer = hostMemoryAllocator.allocate(8 * 4)) { viewHostBuffer.copyFromDeviceBuffer(view.getData()); for (int i = 0; i < 8; i++) { assertEquals(i + 10, viewHostBuffer.getInt(4*i)); @@ -88,7 +90,7 @@ void testToColumnView() { assertEquals(DType.INT32, view.getType()); assertEquals(0, view.getNullCount()); assertEquals(2, view.getRowCount()); - try (HostMemoryBuffer viewHostBuffer = HostMemoryBuffer.allocate(8)) { + try (HostMemoryBuffer viewHostBuffer = hostMemoryAllocator.allocate(8)) { viewHostBuffer.copyFromDeviceBuffer(view.getData()); assertEquals(13, viewHostBuffer.getInt(0)); assertEquals(14, viewHostBuffer.getInt(4)); diff --git a/java/src/test/java/ai/rapids/cudf/MemoryBufferTest.java b/java/src/test/java/ai/rapids/cudf/MemoryBufferTest.java index c332ce660d1..ec36b4f82b0 100644 --- a/java/src/test/java/ai/rapids/cudf/MemoryBufferTest.java +++ b/java/src/test/java/ai/rapids/cudf/MemoryBufferTest.java @@ -25,13 +25,15 @@ import static org.junit.jupiter.api.Assertions.*; public class MemoryBufferTest extends CudfTestBase { + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + private static final byte[] BYTES = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; private static final byte[] EXPECTED = {0, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; @Test public void testAddressOutOfBoundsExceptionWhenCopying() { - try (HostMemoryBuffer from = HostMemoryBuffer.allocate(16); - HostMemoryBuffer to = HostMemoryBuffer.allocate(16)) { + try (HostMemoryBuffer from = hostMemoryAllocator.allocate(16); + HostMemoryBuffer to = hostMemoryAllocator.allocate(16)) { assertThrows(AssertionError.class, () -> to.copyFromMemoryBuffer(-1, from, 0, 16, Cuda.DEFAULT_STREAM)); assertThrows(AssertionError.class, () -> to.copyFromMemoryBuffer(16, from, 0, 16, Cuda.DEFAULT_STREAM)); assertThrows(AssertionError.class, () -> to.copyFromMemoryBuffer(0, from, -1, 16, Cuda.DEFAULT_STREAM)); @@ -45,8 +47,8 @@ public void testAddressOutOfBoundsExceptionWhenCopying() { @Test public void testAddressOutOfBoundsExceptionWhenCopyingAsync() { - try (HostMemoryBuffer from = HostMemoryBuffer.allocate(16); - HostMemoryBuffer to = HostMemoryBuffer.allocate(16)) { + try (HostMemoryBuffer from = hostMemoryAllocator.allocate(16); + HostMemoryBuffer to = hostMemoryAllocator.allocate(16)) { assertThrows(AssertionError.class, () -> to.copyFromMemoryBufferAsync(-1, from, 0, 16, Cuda.DEFAULT_STREAM)); assertThrows(AssertionError.class, () -> to.copyFromMemoryBufferAsync(16, from, 0, 16, Cuda.DEFAULT_STREAM)); assertThrows(AssertionError.class, () -> to.copyFromMemoryBufferAsync(0, from, -1, 16, Cuda.DEFAULT_STREAM)); @@ -60,10 +62,10 @@ public void testAddressOutOfBoundsExceptionWhenCopyingAsync() { @Test public void testCopyingFromDeviceToDevice() { - try (HostMemoryBuffer in = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer in = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer out = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer out = hostMemoryAllocator.allocate(16)) { in.setBytes(0, BYTES, 0, 16); from.copyFromHostBuffer(in); to.copyFromMemoryBuffer(0, from, 0, 16, Cuda.DEFAULT_STREAM); @@ -75,10 +77,10 @@ public void testCopyingFromDeviceToDevice() { @Test public void testCopyingFromDeviceToDeviceAsync() { - try (HostMemoryBuffer in = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer in = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer out = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer out = hostMemoryAllocator.allocate(16)) { in.setBytes(0, BYTES, 0, 16); from.copyFromHostBuffer(in); to.copyFromMemoryBufferAsync(0, from, 0, 16, Cuda.DEFAULT_STREAM); @@ -91,8 +93,8 @@ public void testCopyingFromDeviceToDeviceAsync() { @Test public void testCopyingFromHostToHost() { - try (HostMemoryBuffer from = HostMemoryBuffer.allocate(16); - HostMemoryBuffer to = HostMemoryBuffer.allocate(16)) { + try (HostMemoryBuffer from = hostMemoryAllocator.allocate(16); + HostMemoryBuffer to = hostMemoryAllocator.allocate(16)) { from.setBytes(0, BYTES, 0, 16); to.setBytes(0, BYTES, 0, 16); to.copyFromMemoryBuffer(1, from, 2, 3, Cuda.DEFAULT_STREAM); @@ -102,8 +104,8 @@ public void testCopyingFromHostToHost() { @Test public void testCopyingFromHostToHostAsync() { - try (HostMemoryBuffer from = HostMemoryBuffer.allocate(16); - HostMemoryBuffer to = HostMemoryBuffer.allocate(16)) { + try (HostMemoryBuffer from = hostMemoryAllocator.allocate(16); + HostMemoryBuffer to = hostMemoryAllocator.allocate(16)) { from.setBytes(0, BYTES, 0, 16); to.setBytes(0, BYTES, 0, 16); to.copyFromMemoryBufferAsync(1, from, 2, 3, Cuda.DEFAULT_STREAM); @@ -113,9 +115,9 @@ public void testCopyingFromHostToHostAsync() { @Test public void testCopyingFromHostToDevice() { - try (HostMemoryBuffer from = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer from = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer out = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer out = hostMemoryAllocator.allocate(16)) { from.setBytes(0, BYTES, 0, 16); to.copyFromMemoryBuffer(0, from, 0, 16, Cuda.DEFAULT_STREAM); to.copyFromMemoryBufferAsync(1, from, 2, 3, Cuda.DEFAULT_STREAM); @@ -126,9 +128,9 @@ public void testCopyingFromHostToDevice() { @Test public void testCopyingFromHostToDeviceAsync() { - try (HostMemoryBuffer from = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer from = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer out = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer out = hostMemoryAllocator.allocate(16)) { from.setBytes(0, BYTES, 0, 16); to.copyFromMemoryBufferAsync(0, from, 0, 16, Cuda.DEFAULT_STREAM); to.copyFromMemoryBufferAsync(1, from, 2, 3, Cuda.DEFAULT_STREAM); @@ -140,9 +142,9 @@ public void testCopyingFromHostToDeviceAsync() { @Test public void testCopyingFromDeviceToHost() { - try (HostMemoryBuffer in = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer in = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer to = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer to = hostMemoryAllocator.allocate(16)) { in.setBytes(0, BYTES, 0, 16); from.copyFromHostBuffer(in); to.setBytes(0, BYTES, 0, 16); @@ -153,9 +155,9 @@ public void testCopyingFromDeviceToHost() { @Test public void testCopyingFromDeviceToHostAsync() { - try (HostMemoryBuffer in = HostMemoryBuffer.allocate(16); + try (HostMemoryBuffer in = hostMemoryAllocator.allocate(16); DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16); - HostMemoryBuffer to = HostMemoryBuffer.allocate(16)) { + HostMemoryBuffer to = hostMemoryAllocator.allocate(16)) { in.setBytes(0, BYTES, 0, 16); from.copyFromHostBuffer(in); to.setBytes(0, BYTES, 0, 16); diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index f17197ef608..5c0c738a20f 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -75,6 +75,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TableTest extends CudfTestBase { + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + private static final File TEST_PARQUET_FILE = TestUtils.getResourceAsFile("acq.parquet"); private static final File TEST_PARQUET_FILE_CHUNKED_READ = TestUtils.getResourceAsFile("splittable.parquet"); private static final File TEST_PARQUET_FILE_BINARY = TestUtils.getResourceAsFile("binary.parquet"); @@ -440,7 +442,7 @@ void testReadJSONTableWithMeta() { "{ \"A\": 3, \"B\": 6, \"C\": \"Z\"}\n" + "{ \"A\": 4, \"B\": 8, \"C\": \"W\"}\n").getBytes(StandardCharsets.UTF_8); final int numBytes = data.length; - try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(numBytes)) { + try (HostMemoryBuffer hostbuf = hostMemoryAllocator.allocate(numBytes)) { hostbuf.setBytes(0, data, 0, numBytes); try (Table expected = new Table.TestBuilder() .column(1L, 2L, 3L, 4L) @@ -3465,7 +3467,7 @@ void testSerializationRoundTripConcatOnHostEmpty() throws IOException { do { head = new JCudfSerialization.SerializedTableHeader(din); if (head.wasInitialized()) { - HostMemoryBuffer buff = HostMemoryBuffer.allocate(head.getDataLen()); + HostMemoryBuffer buff = hostMemoryAllocator.allocate(head.getDataLen()); buffers.add(buff); JCudfSerialization.readTableIntoBuffer(din, head, buff); assert head.wasDataRead(); @@ -3624,7 +3626,7 @@ void testSerializationRoundTripConcatHostSide() throws IOException { do { head = new JCudfSerialization.SerializedTableHeader(din); if (head.wasInitialized()) { - HostMemoryBuffer buff = HostMemoryBuffer.allocate(100 * 1024); + HostMemoryBuffer buff = hostMemoryAllocator.allocate(100 * 1024); buffers.add(buff); JCudfSerialization.readTableIntoBuffer(din, head, buff); assert head.wasDataRead(); @@ -3665,7 +3667,7 @@ private void testSerializationRoundTripToHost(Table t) throws IOException { JCudfSerialization.SerializedTableHeader header = new JCudfSerialization.SerializedTableHeader(din); assertTrue(header.wasInitialized()); - try (HostMemoryBuffer buffer = HostMemoryBuffer.allocate(header.getDataLen())) { + try (HostMemoryBuffer buffer = hostMemoryAllocator.allocate(header.getDataLen())) { JCudfSerialization.readTableIntoBuffer(din, header, buffer); assertTrue(header.wasDataRead()); HostColumnVector[] hostColumns = @@ -3727,7 +3729,7 @@ void testConcatHost() throws IOException { DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray())); JCudfSerialization.SerializedTableHeader header = new JCudfSerialization.SerializedTableHeader(in); assert header.wasInitialized(); - try (HostMemoryBuffer buff = HostMemoryBuffer.allocate(header.getDataLen())) { + try (HostMemoryBuffer buff = hostMemoryAllocator.allocate(header.getDataLen())) { JCudfSerialization.readTableIntoBuffer(in, header, buff); assert header.wasDataRead(); try (Table result = JCudfSerialization.readAndConcat( @@ -3758,7 +3760,7 @@ void testSerializationRoundTripSlicedHostSide() throws IOException { do { head = new JCudfSerialization.SerializedTableHeader(din); if (head.wasInitialized()) { - HostMemoryBuffer buff = HostMemoryBuffer.allocate(100 * 1024); + HostMemoryBuffer buff = hostMemoryAllocator.allocate(100 * 1024); buffers.add(buff); JCudfSerialization.readTableIntoBuffer(din, head, buff); assert head.wasDataRead(); @@ -7985,7 +7987,7 @@ private final class MyBufferConsumer implements HostBufferConsumer, AutoCloseabl long offset = 0; public MyBufferConsumer() { - buffer = HostMemoryBuffer.allocate(10 * 1024 * 1024); + buffer = hostMemoryAllocator.allocate(10 * 1024 * 1024); } @Override diff --git a/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java b/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java index ec14a1cfee6..66f4fe39109 100644 --- a/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java +++ b/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java @@ -26,6 +26,8 @@ import java.util.Optional; public class NvcompTest { + private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); + private static final Logger log = LoggerFactory.getLogger(ColumnVector.class); @Test @@ -68,9 +70,9 @@ void testBatchedLZ4RoundTripAsync() { // check the decompressed results against the original for (int i = 0; i < numBuffers; ++i) { try (HostMemoryBuffer expected = - HostMemoryBuffer.allocate(originalBuffers.get(i).getLength()); + hostMemoryAllocator.allocate(originalBuffers.get(i).getLength()); HostMemoryBuffer actual = - HostMemoryBuffer.allocate(uncompressedBuffers.get(i).getLength())) { + hostMemoryAllocator.allocate(uncompressedBuffers.get(i).getLength())) { Assertions.assertTrue(expected.getLength() <= Integer.MAX_VALUE); Assertions.assertTrue(actual.getLength() <= Integer.MAX_VALUE); Assertions.assertEquals(expected.getLength(), actual.getLength(), @@ -114,7 +116,7 @@ private DeviceMemoryBuffer initBatchBuffer(long[] data, int bufferId) { } long[] bufferData = Arrays.copyOfRange(data, dataStart, dataStart + dataLength + 1); DeviceMemoryBuffer devBuffer = null; - try (HostMemoryBuffer hmb = HostMemoryBuffer.allocate(bufferData.length * 8)) { + try (HostMemoryBuffer hmb = hostMemoryAllocator.allocate(bufferData.length * 8)) { hmb.setLongs(0, bufferData, 0, bufferData.length); devBuffer = DeviceMemoryBuffer.allocate(hmb.getLength()); devBuffer.copyFromHostBuffer(hmb); From 44649ec2ea1ac228b28719f761543b3479e09ac9 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 18 Aug 2023 22:38:25 -0700 Subject: [PATCH 2/6] unroll imports --- .../ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java index 7d7ab732c26..1aa7e5e11a0 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java @@ -16,7 +16,16 @@ package ai.rapids.cudf.nvcomp; -import ai.rapids.cudf.*; +import ai.rapids.cudf.BaseDeviceMemoryBuffer; +import ai.rapids.cudf.CloseableArray; +import ai.rapids.cudf.Cuda; +import ai.rapids.cudf.DefaultHostMemoryAllocator; +import ai.rapids.cudf.DeviceMemoryBuffer; +import ai.rapids.cudf.HostMemoryAllocator; +import ai.rapids.cudf.HostMemoryBuffer; +import ai.rapids.cudf.MemoryBuffer; +import ai.rapids.cudf.NvtxColor; +import ai.rapids.cudf.NvtxRange; /** Multi-buffer LZ4 compressor */ public class BatchedLZ4Compressor { From c77a757a60ba3474e3bec6297de61e33507ab2e6 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Fri, 18 Aug 2023 22:55:00 -0700 Subject: [PATCH 3/6] pre-commit --- java/src/test/java/ai/rapids/cudf/CuFileTest.java | 2 +- java/src/test/java/ai/rapids/cudf/GatherMapTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/src/test/java/ai/rapids/cudf/CuFileTest.java b/java/src/test/java/ai/rapids/cudf/CuFileTest.java index c819b617ced..8945b6684d5 100644 --- a/java/src/test/java/ai/rapids/cudf/CuFileTest.java +++ b/java/src/test/java/ai/rapids/cudf/CuFileTest.java @@ -29,7 +29,7 @@ public class CuFileTest extends CudfTestBase { private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); - + @AfterEach void tearDown() { if (PinnedMemoryPool.isInitialized()) { diff --git a/java/src/test/java/ai/rapids/cudf/GatherMapTest.java b/java/src/test/java/ai/rapids/cudf/GatherMapTest.java index edf469f4b03..8bab049c0af 100644 --- a/java/src/test/java/ai/rapids/cudf/GatherMapTest.java +++ b/java/src/test/java/ai/rapids/cudf/GatherMapTest.java @@ -25,7 +25,7 @@ public class GatherMapTest { private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); - + @Test void testInvalidBuffer() { try (DeviceMemoryBuffer buffer = DeviceMemoryBuffer.allocate(707)) { From 8b846335fc16338ecc6a35488c394e6e0312ca44 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 22 Aug 2023 10:35:31 -0700 Subject: [PATCH 4/6] explicit allocator param --- java/src/main/java/ai/rapids/cudf/ColumnView.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/ColumnView.java b/java/src/main/java/ai/rapids/cudf/ColumnView.java index f586042f52c..bdcfe89687b 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnView.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnView.java @@ -42,8 +42,6 @@ public class ColumnView implements AutoCloseable, BinaryOperable { protected final long nullCount; protected final ColumnVector.OffHeapState offHeap; - private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); - /** * Constructs a Column View given a native view address. This asserts that if the ColumnView is * of nested-type it doesn't contain non-empty nulls @@ -5005,7 +5003,7 @@ private static NestedColumnVector createNestedColumnVector(DType type, long rows ///////////////////////////////////////////////////////////////////////////// private static HostColumnVectorCore copyToHostNestedHelper( - ColumnView deviceCvPointer) { + ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) { if (deviceCvPointer == null) { return null; } @@ -5039,7 +5037,7 @@ private static HostColumnVectorCore copyToHostNestedHelper( int numChildren = deviceCvPointer.getNumChildren(); for (int i = 0; i < numChildren; i++) { try(ColumnView childDevPtr = deviceCvPointer.getChildColumnView(i)) { - children.add(copyToHostNestedHelper(childDevPtr)); + children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator)); } } currNullCount = deviceCvPointer.getNullCount(); @@ -5076,7 +5074,7 @@ private static HostColumnVectorCore copyToHostNestedHelper( /** * Copy the data to the host. */ - public HostColumnVector copyToHost() { + public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { try (NvtxRange toHost = new NvtxRange("ensureOnHost", NvtxColor.BLUE)) { HostMemoryBuffer hostDataBuffer = null; HostMemoryBuffer hostValidityBuffer = null; @@ -5129,7 +5127,7 @@ public HostColumnVector copyToHost() { List children = new ArrayList<>(); for (int i = 0; i < getNumChildren(); i++) { try (ColumnView childDevPtr = getChildColumnView(i)) { - children.add(copyToHostNestedHelper(childDevPtr)); + children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator)); } } HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount), @@ -5162,6 +5160,10 @@ public HostColumnVector copyToHost() { } } + public HostColumnVector copyToHost() { + return copyToHost(DefaultHostMemoryAllocator.get()); + } + /** * Exact check if a column or its descendants have non-empty null rows * From 20a3d5207cb414d748d5f15f31af37ddb56dc08d Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 22 Aug 2023 13:06:47 -0700 Subject: [PATCH 5/6] more explicit params Signed-off-by: Gera Shegalov --- .../ai/rapids/cudf/JCudfSerialization.java | 21 +++++++-- .../java/ai/rapids/cudf/PinnedMemoryPool.java | 9 ++-- java/src/main/java/ai/rapids/cudf/Table.java | 45 ++++++++++++++++--- 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java index 50c3bec9cf8..7deb5bae541 100644 --- a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java +++ b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java @@ -61,8 +61,6 @@ public class JCudfSerialization { private static final int SER_FORMAT_MAGIC_NUMBER = 0x43554446; private static final short VERSION_NUMBER = 0x0000; - private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); - private static final class ColumnOffsets { private final long validity; private final long offsets; @@ -1812,10 +1810,13 @@ public static ContiguousTable concatToContiguousTable(SerializedTableHeader[] he * Concatenate multiple tables in host memory into a single host table buffer. * @param headers table headers corresponding to the host table buffers * @param dataBuffers host table buffer for each input table to be concatenated + * @param hostMemoryAllocator allocator for host memory buffers * @return host table header and buffer */ public static HostConcatResult concatToHostBuffer(SerializedTableHeader[] headers, - HostMemoryBuffer[] dataBuffers) throws IOException { + HostMemoryBuffer[] dataBuffers, + HostMemoryAllocator hostMemoryAllocator + ) throws IOException { ColumnBufferProvider[][] providersPerColumn = providersFrom(headers, dataBuffers); try { SerializedTableHeader combined = calcConcatHeader(providersPerColumn); @@ -1839,6 +1840,12 @@ public static HostConcatResult concatToHostBuffer(SerializedTableHeader[] header } } + public static HostConcatResult concatToHostBuffer(SerializedTableHeader[] headers, + HostMemoryBuffer[] dataBuffers + ) throws IOException { + return concatToHostBuffer(headers, dataBuffers, DefaultHostMemoryAllocator.get()); + } + /** * Deserialize a serialized contiguous table into an array of host columns. * @@ -1918,12 +1925,14 @@ public static TableAndRowCountPair readTableFrom(SerializedTableHeader header, /** * Read a serialize table from the given InputStream. * @param in the stream to read the table data from. + * @param hostMemoryAllocator a host memory allocator for an intermediate host memory buffer * @return the deserialized table in device memory, or null if the stream has no table to read * from, an end of the stream at the very beginning. * @throws IOException on any error. * @throws EOFException if the data stream ended unexpectedly in the middle of processing. */ - public static TableAndRowCountPair readTableFrom(InputStream in) throws IOException { + public static TableAndRowCountPair readTableFrom(InputStream in, + HostMemoryAllocator hostMemoryAllocator) throws IOException { DataInputStream din; if (in instanceof DataInputStream) { din = (DataInputStream) in; @@ -1944,6 +1953,10 @@ public static TableAndRowCountPair readTableFrom(InputStream in) throws IOExcept } } + public static TableAndRowCountPair readTableFrom(InputStream in) throws IOException { + return readTableFrom(in, DefaultHostMemoryAllocator.get()); + } + /** Holds the result of deserializing a table. */ public static final class TableAndRowCountPair implements Closeable { private final int numRows; diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 85bd1831a9e..969946a9533 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -233,15 +233,18 @@ public static HostMemoryBuffer tryAllocate(long bytes) { * @param bytes size in bytes to allocate * @return newly created buffer */ - public static HostMemoryBuffer allocate(long bytes) { - final HostMemoryAllocator allocator = DefaultHostMemoryAllocator.get(); + public static HostMemoryBuffer allocate(long bytes, HostMemoryAllocator hostMemoryAllocator) { HostMemoryBuffer result = tryAllocate(bytes); if (result == null) { - result = allocator.allocate(bytes, false); + result = hostMemoryAllocator.allocate(bytes, false); } return result; } + public static HostMemoryBuffer allocate(long bytes) { + return allocate(bytes, DefaultHostMemoryAllocator.get()); + } + /** * Get the number of bytes free in the pinned memory pool. * diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 38d060b5f98..0e62363b96e 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -47,8 +47,6 @@ public final class Table implements AutoCloseable { private long nativeHandle; private ColumnVector[] columns; - private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); - /** * Table class makes a copy of the array of {@link ColumnVector}s passed to it. The class * will decrease the refcount @@ -823,10 +821,11 @@ public static Table readCSV(Schema schema, CSVOptions opts, byte[] buffer) { * @param buffer raw UTF8 formatted bytes. * @param offset the starting offset into buffer. * @param len the number of bytes to parse. + * @param hostMemoryAllocator allocator for host memory buffers * @return the data parsed as a table on the GPU. */ public static Table readCSV(Schema schema, CSVOptions opts, byte[] buffer, long offset, - long len) { + long len, HostMemoryAllocator hostMemoryAllocator) { if (len <= 0) { len = buffer.length - offset; } @@ -839,6 +838,12 @@ public static Table readCSV(Schema schema, CSVOptions opts, byte[] buffer, long } } + + public static Table readCSV(Schema schema, CSVOptions opts, byte[] buffer, long offset, + long len) { + return readCSV(schema, opts, buffer, offset, len, DefaultHostMemoryAllocator.get()); + } + /** * Read CSV formatted data. * @param schema the schema of the data. You may use Schema.INFERRED to infer the schema. @@ -1040,10 +1045,11 @@ public static Table readJSON(Schema schema, JSONOptions opts, File path) { * @param buffer raw UTF8 formatted bytes. * @param offset the starting offset into buffer. * @param len the number of bytes to parse. + * @param hostMemoryAllocator allocator for host memory buffers * @return the data parsed as a table on the GPU. */ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, long offset, - long len) { + long len, HostMemoryAllocator hostMemoryAllocator) { if (len <= 0) { len = buffer.length - offset; } @@ -1056,6 +1062,11 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, lon } } + public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, long offset, + long len) { + return readJSON(schema, opts, buffer, offset, len, DefaultHostMemoryAllocator.get()); + } + /** * Read JSON formatted data and infer the column names and schema. * @param opts various JSON parsing options. @@ -1145,9 +1156,11 @@ public static Table readParquet(ParquetOptions opts, byte[] buffer) { * @param buffer raw parquet formatted bytes. * @param offset the starting offset into buffer. * @param len the number of bytes to parse. + * @param hostMemoryAllocator allocator for host memory buffers * @return the data parsed as a table on the GPU. */ - public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, long len) { + public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, long len, + HostMemoryAllocator hostMemoryAllocator) { if (len <= 0) { len = buffer.length - offset; } @@ -1160,6 +1173,10 @@ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, } } + public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset, long len) { + return readParquet(opts, buffer, offset, len, DefaultHostMemoryAllocator.get()); + } + /** * Read parquet formatted data. * @param opts various parquet parsing options. @@ -1225,9 +1242,11 @@ public static Table readAvro(AvroOptions opts, byte[] buffer) { * @param buffer raw Avro formatted bytes. * @param offset the starting offset into buffer. * @param len the number of bytes to parse. + * @param hostMemoryAllocator allocator for host memory buffers * @return the data parsed as a table on the GPU. */ - public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long len) { + public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long len, + HostMemoryAllocator hostMemoryAllocator) { assert offset >= 0 && offset < buffer.length; assert len <= buffer.length - offset; len = len > 0 ? len : buffer.length - offset; @@ -1238,6 +1257,11 @@ public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long } } + public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long len) { + return readAvro(opts, buffer, offset, len, DefaultHostMemoryAllocator.get()); + } + + /** * Read Avro formatted data. * @param opts various Avro parsing options. @@ -1303,9 +1327,11 @@ public static Table readORC(ORCOptions opts, byte[] buffer) { * @param buffer raw ORC formatted bytes. * @param offset the starting offset into buffer. * @param len the number of bytes to parse. + * @param hostMemoryAllocator allocator for host memory buffers * @return the data parsed as a table on the GPU. */ - public static Table readORC(ORCOptions opts, byte[] buffer, long offset, long len) { + public static Table readORC(ORCOptions opts, byte[] buffer, long offset, long len, + HostMemoryAllocator hostMemoryAllocator) { if (len <= 0) { len = buffer.length - offset; } @@ -1318,6 +1344,11 @@ public static Table readORC(ORCOptions opts, byte[] buffer, long offset, long le } } + public static Table readORC(ORCOptions opts, byte[] buffer, long offset, long len) { + return readORC(opts, buffer, offset, len, DefaultHostMemoryAllocator.get()); + } + + /** * Read ORC formatted data. * @param opts various ORC parsing options. From fc539c00b884b3180c1e6859239f451a9e00d3e0 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 23 Aug 2023 13:13:10 -0700 Subject: [PATCH 6/6] readArrowIPCChunked call path Signed-off-by: Gera Shegalov --- java/src/main/java/ai/rapids/cudf/Table.java | 21 +++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 0e62363b96e..57189b052b6 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -1639,10 +1639,13 @@ public static TableWriter writeArrowIPCChunked(ArrowIPCWriterOptions options, private static class ArrowReaderWrapper implements AutoCloseable { private HostBufferProvider provider; private HostMemoryBuffer buffer; + private final HostMemoryAllocator hostMemoryAllocator; - private ArrowReaderWrapper(HostBufferProvider provider) { + private ArrowReaderWrapper(HostBufferProvider provider, + HostMemoryAllocator hostMemoryAllocator) { this.provider = provider; - buffer = hostMemoryAllocator.allocate(10 * 1024 * 1024, false); + this.hostMemoryAllocator = hostMemoryAllocator; + buffer = this.hostMemoryAllocator.allocate(10 * 1024 * 1024, false); } // Called From JNI @@ -1689,8 +1692,9 @@ private ArrowIPCStreamedTableReader(ArrowIPCOptions options, File inputFile) { this.callback = options.getCallback(); } - private ArrowIPCStreamedTableReader(ArrowIPCOptions options, HostBufferProvider provider) { - this.provider = new ArrowReaderWrapper(provider); + private ArrowIPCStreamedTableReader(ArrowIPCOptions options, HostBufferProvider provider, + HostMemoryAllocator hostMemoryAllocator) { + this.provider = new ArrowReaderWrapper(provider, hostMemoryAllocator); this.handle = readArrowIPCBufferBegin(this.provider); this.callback = options.getCallback(); } @@ -1753,9 +1757,16 @@ public static StreamedTableReader readArrowIPCChunked(File inputFile) { * @param provider what will provide the data being read. * @return a reader. */ + + public static StreamedTableReader readArrowIPCChunked(ArrowIPCOptions options, + HostBufferProvider provider, + HostMemoryAllocator hostMemoryAllocator) { + return new ArrowIPCStreamedTableReader(options, provider, hostMemoryAllocator); + } + public static StreamedTableReader readArrowIPCChunked(ArrowIPCOptions options, HostBufferProvider provider) { - return new ArrowIPCStreamedTableReader(options, provider); + return new ArrowIPCStreamedTableReader(options, provider, DefaultHostMemoryAllocator.get()); } /**