Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HostMemoryAllocator interface #13924

Merged
merged 8 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions java/src/main/java/ai/rapids/cudf/ColumnView.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ColumnView implements AutoCloseable, BinaryOperable {
protected final long nullCount;
protected final ColumnVector.OffHeapState offHeap;

private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get();

/**
* Constructs a Column View given a native view address. This asserts that if the ColumnView is
* of nested-type it doesn't contain non-empty nulls
Expand Down Expand Up @@ -5023,15 +5025,15 @@ private static HostColumnVectorCore copyToHostNestedHelper(
currOffsets = deviceCvPointer.getOffsets();
currValidity = deviceCvPointer.getValid();
if (currData != null) {
hostData = HostMemoryBuffer.allocate(currData.length);
hostData = hostMemoryAllocator.allocate(currData.length);
hostData.copyFromDeviceBuffer(currData);
}
if (currValidity != null) {
hostValid = HostMemoryBuffer.allocate(currValidity.length);
hostValid = hostMemoryAllocator.allocate(currValidity.length);
hostValid.copyFromDeviceBuffer(currValidity);
}
if (currOffsets != null) {
hostOffsets = HostMemoryBuffer.allocate(currOffsets.length);
hostOffsets = hostMemoryAllocator.allocate(currOffsets.length);
hostOffsets.copyFromDeviceBuffer(currOffsets);
}
int numChildren = deviceCvPointer.getNumChildren();
Expand Down Expand Up @@ -5094,16 +5096,16 @@ public HostColumnVector copyToHost() {
getNullCount();
if (!type.isNestedType()) {
if (valid != null) {
hostValidityBuffer = HostMemoryBuffer.allocate(valid.getLength());
hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength());
hostValidityBuffer.copyFromDeviceBuffer(valid);
}
if (offsets != null) {
hostOffsetsBuffer = HostMemoryBuffer.allocate(offsets.length);
hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.length);
hostOffsetsBuffer.copyFromDeviceBuffer(offsets);
}
// If a strings column is all null values there is no data buffer allocated
if (data != null) {
hostDataBuffer = HostMemoryBuffer.allocate(data.length);
hostDataBuffer = hostMemoryAllocator.allocate(data.length);
hostDataBuffer.copyFromDeviceBuffer(data);
}
HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount),
Expand All @@ -5112,16 +5114,16 @@ public HostColumnVector copyToHost() {
return ret;
} else {
if (data != null) {
hostDataBuffer = HostMemoryBuffer.allocate(data.length);
hostDataBuffer = hostMemoryAllocator.allocate(data.length);
hostDataBuffer.copyFromDeviceBuffer(data);
}

if (valid != null) {
hostValidityBuffer = HostMemoryBuffer.allocate(valid.getLength());
hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength());
hostValidityBuffer.copyFromDeviceBuffer(valid);
}
if (offsets != null) {
hostOffsetsBuffer = HostMemoryBuffer.allocate(offsets.getLength());
hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.getLength());
hostOffsetsBuffer.copyFromDeviceBuffer(offsets);
}
List<HostColumnVectorCore> children = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

public class DefaultHostMemoryAllocator implements HostMemoryAllocator {
private static final HostMemoryAllocator INSTANCE = new DefaultHostMemoryAllocator();
public static HostMemoryAllocator get() {
return INSTANCE;
}

@Override
public HostMemoryBuffer allocate(long bytes, boolean preferPinned) {
return HostMemoryBuffer.allocate(bytes, preferPinned);
}

@Override
public HostMemoryBuffer allocate(long bytes) {
return HostMemoryBuffer.allocate(bytes);
}
}
39 changes: 39 additions & 0 deletions java/src/main/java/ai/rapids/cudf/HostMemoryAllocator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.rapids.cudf;

public interface HostMemoryAllocator {

/**
* Allocate memory, but be sure to close the returned buffer to avoid memory leaks.
* @param bytes size in bytes to allocate
* @param preferPinned If set to true, the pinned memory pool will be used if possible with a
* fallback to off-heap memory. If set to false, the allocation will always
* be from off-heap memory.
* @return the newly created buffer
*/
HostMemoryBuffer allocate(long bytes, boolean preferPinned);

/**
* Allocate memory, but be sure to close the returned buffer to avoid memory leaks. Pinned memory
* for allocations preference is up to the implementor
*
* @param bytes size in bytes to allocate
* @return the newly created buffer
*/
HostMemoryBuffer allocate(long bytes);
}
6 changes: 4 additions & 2 deletions java/src/main/java/ai/rapids/cudf/JCudfSerialization.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class JCudfSerialization {
private static final int SER_FORMAT_MAGIC_NUMBER = 0x43554446;
private static final short VERSION_NUMBER = 0x0000;

private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get();

private static final class ColumnOffsets {
private final long validity;
private final long offsets;
Expand Down Expand Up @@ -1817,7 +1819,7 @@ public static HostConcatResult concatToHostBuffer(SerializedTableHeader[] header
ColumnBufferProvider[][] providersPerColumn = providersFrom(headers, dataBuffers);
try {
SerializedTableHeader combined = calcConcatHeader(providersPerColumn);
HostMemoryBuffer hostBuffer = HostMemoryBuffer.allocate(combined.dataLen);
HostMemoryBuffer hostBuffer = hostMemoryAllocator.allocate(combined.dataLen);
try {
try (NvtxRange range = new NvtxRange("Concat Host Side", NvtxColor.GREEN)) {
DataWriter writer = writerFrom(hostBuffer);
Expand Down Expand Up @@ -1934,7 +1936,7 @@ public static TableAndRowCountPair readTableFrom(InputStream in) throws IOExcept
return new TableAndRowCountPair(0, null);
}

try (HostMemoryBuffer hostBuffer = HostMemoryBuffer.allocate(header.dataLen)) {
try (HostMemoryBuffer hostBuffer = hostMemoryAllocator.allocate(header.dataLen)) {
if (header.dataLen > 0) {
readTableIntoBuffer(din, header, hostBuffer);
}
Expand Down
3 changes: 2 additions & 1 deletion java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ public static HostMemoryBuffer tryAllocate(long bytes) {
* @return newly created buffer
*/
public static HostMemoryBuffer allocate(long bytes) {
final HostMemoryAllocator allocator = DefaultHostMemoryAllocator.get();
HostMemoryBuffer result = tryAllocate(bytes);
if (result == null) {
result = HostMemoryBuffer.allocate(bytes, false);
result = allocator.allocate(bytes, false);
}
return result;
}
Expand Down
14 changes: 8 additions & 6 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public final class Table implements AutoCloseable {
private long nativeHandle;
private ColumnVector[] columns;

private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get();

/**
* Table class makes a copy of the array of {@link ColumnVector}s passed to it. The class
* will decrease the refcount
Expand Down Expand Up @@ -831,7 +833,7 @@ public static Table readCSV(Schema schema, CSVOptions opts, byte[] buffer, long
assert len > 0;
assert len <= buffer.length - offset;
assert offset >= 0 && offset < buffer.length;
try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) {
try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) {
newBuf.setBytes(0, buffer, offset, len);
return readCSV(schema, opts, newBuf, 0, len);
}
Expand Down Expand Up @@ -1048,7 +1050,7 @@ public static Table readJSON(Schema schema, JSONOptions opts, byte[] buffer, lon
assert len > 0;
assert len <= buffer.length - offset;
assert offset >= 0 && offset < buffer.length;
try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) {
try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) {
newBuf.setBytes(0, buffer, offset, len);
return readJSON(schema, opts, newBuf, 0, len);
}
Expand Down Expand Up @@ -1152,7 +1154,7 @@ public static Table readParquet(ParquetOptions opts, byte[] buffer, long offset,
assert len > 0;
assert len <= buffer.length - offset;
assert offset >= 0 && offset < buffer.length;
try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) {
try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) {
newBuf.setBytes(0, buffer, offset, len);
return readParquet(opts, newBuf, 0, len);
}
Expand Down Expand Up @@ -1230,7 +1232,7 @@ public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long
assert len <= buffer.length - offset;
len = len > 0 ? len : buffer.length - offset;

try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) {
try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) {
newBuf.setBytes(0, buffer, offset, len);
return readAvro(opts, newBuf, 0, len);
}
Expand Down Expand Up @@ -1310,7 +1312,7 @@ public static Table readORC(ORCOptions opts, byte[] buffer, long offset, long le
assert len > 0;
assert len <= buffer.length - offset;
assert offset >= 0 && offset < buffer.length;
try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) {
try (HostMemoryBuffer newBuf = hostMemoryAllocator.allocate(len)) {
newBuf.setBytes(0, buffer, offset, len);
return readORC(opts, newBuf, 0, len);
}
Expand Down Expand Up @@ -1609,7 +1611,7 @@ private static class ArrowReaderWrapper implements AutoCloseable {

private ArrowReaderWrapper(HostBufferProvider provider) {
this.provider = provider;
buffer = HostMemoryBuffer.allocate(10 * 1024 * 1024, false);
buffer = hostMemoryAllocator.allocate(10 * 1024 * 1024, false);
}

// Called From JNI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
import ai.rapids.cudf.BaseDeviceMemoryBuffer;
import ai.rapids.cudf.CloseableArray;
import ai.rapids.cudf.Cuda;
import ai.rapids.cudf.DefaultHostMemoryAllocator;
import ai.rapids.cudf.DeviceMemoryBuffer;
import ai.rapids.cudf.HostMemoryAllocator;
import ai.rapids.cudf.HostMemoryBuffer;
import ai.rapids.cudf.MemoryBuffer;
import ai.rapids.cudf.NvtxColor;
import ai.rapids.cudf.NvtxRange;

import java.util.Arrays;

/** Multi-buffer LZ4 compressor */
public class BatchedLZ4Compressor {
private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get();

static final long MAX_CHUNK_SIZE = 16777216; // in bytes
// each chunk has a 64-bit integer value as metadata containing the compressed size
static final long METADATA_BYTES_PER_CHUNK = 8;
Expand Down Expand Up @@ -207,7 +209,7 @@ private DeviceMemoryBuffer putAddrsAndSizesOnDevice(long[] inputAddrs,
final long outputAddrsOffset = inputAddrs.length * 8L;
final long sizesOffset = outputAddrsOffset + inputAddrs.length * 8L;
try (NvtxRange range = new NvtxRange("putAddrsAndSizesOnDevice", NvtxColor.YELLOW)) {
try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(totalSize);
try (HostMemoryBuffer hostbuf = hostMemoryAllocator.allocate(totalSize);
DeviceMemoryBuffer result = DeviceMemoryBuffer.allocate(totalSize)) {
hostbuf.setLongs(0, inputAddrs, 0, inputAddrs.length);
hostbuf.setLongs(outputAddrsOffset, outputAddrs, 0, outputAddrs.length);
Expand All @@ -224,7 +226,7 @@ private DeviceMemoryBuffer putAddrsAndSizesOnDevice(long[] inputAddrs,
// Synchronously copy the resulting compressed sizes from device memory to host memory.
private long[] getOutputChunkSizes(BaseDeviceMemoryBuffer devChunkSizes, Cuda.Stream stream) {
try (NvtxRange range = new NvtxRange("getOutputChunkSizes", NvtxColor.YELLOW)) {
try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(devChunkSizes.getLength())) {
try (HostMemoryBuffer hostbuf = hostMemoryAllocator.allocate(devChunkSizes.getLength())) {
hostbuf.copyFromDeviceBuffer(devChunkSizes, stream);
int numChunks = (int) (devChunkSizes.getLength() / 8);
long[] result = new long[numChunks];
Expand Down
3 changes: 2 additions & 1 deletion java/src/test/java/ai/rapids/cudf/ColumnVectorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6737,10 +6737,11 @@ void testColumnViewWithNonEmptyNullsIsCleared() {
List<Integer> list1 = Arrays.asList(4, 5, null);
List<Integer> list2 = Arrays.asList(7, 8, 9);
List<Integer> list3 = null;
final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get();
try (ColumnVector input = ColumnVectorTest.makeListsColumn(DType.INT32, list0, list1, list2, list3);
BaseDeviceMemoryBuffer baseValidityBuffer = input.getDeviceBufferFor(BufferType.VALIDITY);
BaseDeviceMemoryBuffer baseOffsetBuffer = input.getDeviceBufferFor(BufferType.OFFSET);
HostMemoryBuffer newValidity = HostMemoryBuffer.allocate(BitVectorHelper.getValidityAllocationSizeInBytes(4))) {
HostMemoryBuffer newValidity = hostMemoryAllocator.allocate(BitVectorHelper.getValidityAllocationSizeInBytes(4))) {

newValidity.copyFromDeviceBuffer(baseValidityBuffer);
// we are setting list1 with 3 elements to null. This will result in a non-empty null in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
public class ColumnViewNonEmptyNullsTest extends CudfTestBase {

private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get();

@Test
void testAndNullReconfigureNulls() {
try (ColumnVector v0 = ColumnVector.fromBoxedInts(0, 100, null, null, Integer.MIN_VALUE, null);
Expand Down Expand Up @@ -84,7 +86,7 @@ private ColumnView[] getColumnViewWithNonEmptyNulls() {
ColumnVector input = ColumnVectorTest.makeListsColumn(DType.INT32, list0, list1, list2, list3);
// Modify the validity buffer
BaseDeviceMemoryBuffer dmb = input.getDeviceBufferFor(BufferType.VALIDITY);
try (HostMemoryBuffer newValidity = HostMemoryBuffer.allocate(64)) {
try (HostMemoryBuffer newValidity = hostMemoryAllocator.allocate(64)) {
newValidity.copyFromDeviceBuffer(dmb);
BitVectorHelper.setNullAt(newValidity, 1);
dmb.copyFromHostBuffer(newValidity);
Expand Down
15 changes: 9 additions & 6 deletions java/src/test/java/ai/rapids/cudf/CuFileTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import static org.junit.jupiter.api.Assumptions.assumeTrue;

public class CuFileTest extends CudfTestBase {

private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get();

@AfterEach
void tearDown() {
if (PinnedMemoryPool.isInitialized()) {
Expand Down Expand Up @@ -67,10 +70,10 @@ public void testAppendToExistingFile(@TempDir File tempDir) throws IOException {
}

private void verifyCopyToFile(File tempFile) {
try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(16);
try (HostMemoryBuffer orig = hostMemoryAllocator.allocate(16);
DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16);
DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16);
HostMemoryBuffer dest = HostMemoryBuffer.allocate(16)) {
HostMemoryBuffer dest = hostMemoryAllocator.allocate(16)) {
orig.setLong(0, 123456789);
from.copyFromHostBuffer(orig);
CuFile.writeDeviceBufferToFile(tempFile, 0, from);
Expand All @@ -81,10 +84,10 @@ private void verifyCopyToFile(File tempFile) {
}

private void verifyAppendToFile(File tempFile) {
try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(16);
try (HostMemoryBuffer orig = hostMemoryAllocator.allocate(16);
DeviceMemoryBuffer from = DeviceMemoryBuffer.allocate(16);
DeviceMemoryBuffer to = DeviceMemoryBuffer.allocate(16);
HostMemoryBuffer dest = HostMemoryBuffer.allocate(16)) {
HostMemoryBuffer dest = hostMemoryAllocator.allocate(16)) {
orig.setLong(0, 123456789);
from.copyFromHostBuffer(orig);
assertEquals(0, CuFile.appendDeviceBufferToFile(tempFile, from));
Expand Down Expand Up @@ -128,7 +131,7 @@ public void testReadWriteRegisteredBuffer(@TempDir File tempDir) {
}

private void verifyReadWrite(File tempFile, int length, boolean registerBuffer) {
try (HostMemoryBuffer orig = HostMemoryBuffer.allocate(length);
try (HostMemoryBuffer orig = hostMemoryAllocator.allocate(length);
CuFileBuffer from = CuFileBuffer.allocate(length, registerBuffer);
CuFileWriteHandle writer = new CuFileWriteHandle(tempFile.getAbsolutePath())) {
orig.setLong(0, 123456789);
Expand All @@ -141,7 +144,7 @@ private void verifyReadWrite(File tempFile, int length, boolean registerBuffer)
}
try (CuFileBuffer to = CuFileBuffer.allocate(length, registerBuffer);
CuFileReadHandle reader = new CuFileReadHandle(tempFile.getAbsolutePath());
HostMemoryBuffer dest = HostMemoryBuffer.allocate(length)) {
HostMemoryBuffer dest = hostMemoryAllocator.allocate(length)) {
reader.read(to, 0);
dest.copyFromDeviceBuffer(to);
assertEquals(123456789, dest.getLong(0));
Expand Down
Loading