From 8def2ec1acac6a538002db011d977bb22cfbda82 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 30 Jul 2024 14:34:59 -0500 Subject: [PATCH] Add Java APIs to copy column data to host asynchronously (#16429) Adds Java methods to ColumnView to allow copying of column data to host memory asynchronously. This can be used to avoid many unnecessary stream synchronization when copying many columns to the host. Authors: - Jason Lowe (https://github.com/jlowe) Approvers: - Nghia Truong (https://github.com/ttnghia) - Robert (Bobby) Evans (https://github.com/revans2) URL: https://github.com/rapidsai/cudf/pull/16429 --- .../main/java/ai/rapids/cudf/ColumnView.java | 52 +++++++++++++------ .../java/ai/rapids/cudf/HostColumnVector.java | 4 ++ .../ai/rapids/cudf/HostColumnVectorCore.java | 4 +- .../ai/rapids/cudf/JCudfSerialization.java | 5 +- 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/ColumnView.java b/java/src/main/java/ai/rapids/cudf/ColumnView.java index 997ff77bae3..8ff2f0f0a73 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnView.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnView.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -5034,8 +5034,8 @@ private static NestedColumnVector createNestedColumnVector(DType type, long rows // DATA MOVEMENT ///////////////////////////////////////////////////////////////////////////// - private static HostColumnVectorCore copyToHostNestedHelper( - ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) { + private static HostColumnVectorCore copyToHostAsyncNestedHelper( + Cuda.Stream stream, ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) { if (deviceCvPointer == null) { return null; } @@ -5056,20 +5056,20 @@ private static HostColumnVectorCore copyToHostNestedHelper( currValidity = deviceCvPointer.getValid(); if (currData != null) { hostData = hostMemoryAllocator.allocate(currData.length); - hostData.copyFromDeviceBuffer(currData); + hostData.copyFromDeviceBufferAsync(currData, stream); } if (currValidity != null) { hostValid = hostMemoryAllocator.allocate(currValidity.length); - hostValid.copyFromDeviceBuffer(currValidity); + hostValid.copyFromDeviceBufferAsync(currValidity, stream); } if (currOffsets != null) { hostOffsets = hostMemoryAllocator.allocate(currOffsets.length); - hostOffsets.copyFromDeviceBuffer(currOffsets); + hostOffsets.copyFromDeviceBufferAsync(currOffsets, stream); } int numChildren = deviceCvPointer.getNumChildren(); for (int i = 0; i < numChildren; i++) { try(ColumnView childDevPtr = deviceCvPointer.getChildColumnView(i)) { - children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator)); + children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator)); } } currNullCount = deviceCvPointer.getNullCount(); @@ -5103,11 +5103,20 @@ private static HostColumnVectorCore copyToHostNestedHelper( } } + /** Copy the data to the host synchronously. */ + public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { + HostColumnVector result = copyToHostAsync(Cuda.DEFAULT_STREAM, hostMemoryAllocator); + Cuda.DEFAULT_STREAM.sync(); + return result; + } + /** - * Copy the data to the host. + * Copy the data to the host asynchronously. The caller MUST synchronize on the stream + * before examining the result. */ - public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { - try (NvtxRange toHost = new NvtxRange("ensureOnHost", NvtxColor.BLUE)) { + public HostColumnVector copyToHostAsync(Cuda.Stream stream, + HostMemoryAllocator hostMemoryAllocator) { + try (NvtxRange toHost = new NvtxRange("toHostAsync", NvtxColor.BLUE)) { HostMemoryBuffer hostDataBuffer = null; HostMemoryBuffer hostValidityBuffer = null; HostMemoryBuffer hostOffsetsBuffer = null; @@ -5127,16 +5136,16 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { if (!type.isNestedType()) { if (valid != null) { hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength()); - hostValidityBuffer.copyFromDeviceBuffer(valid); + hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream); } if (offsets != null) { hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.length); - hostOffsetsBuffer.copyFromDeviceBuffer(offsets); + hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream); } // If a strings column is all null values there is no data buffer allocated if (data != null) { hostDataBuffer = hostMemoryAllocator.allocate(data.length); - hostDataBuffer.copyFromDeviceBuffer(data); + hostDataBuffer.copyFromDeviceBufferAsync(data, stream); } HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount), hostDataBuffer, hostValidityBuffer, hostOffsetsBuffer); @@ -5145,21 +5154,21 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { } else { if (data != null) { hostDataBuffer = hostMemoryAllocator.allocate(data.length); - hostDataBuffer.copyFromDeviceBuffer(data); + hostDataBuffer.copyFromDeviceBufferAsync(data, stream); } if (valid != null) { hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength()); - hostValidityBuffer.copyFromDeviceBuffer(valid); + hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream); } if (offsets != null) { hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.getLength()); - hostOffsetsBuffer.copyFromDeviceBuffer(offsets); + hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream); } List children = new ArrayList<>(); for (int i = 0; i < getNumChildren(); i++) { try (ColumnView childDevPtr = getChildColumnView(i)) { - children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator)); + children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator)); } } HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount), @@ -5192,10 +5201,19 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { } } + /** Copy the data to host memory synchronously */ public HostColumnVector copyToHost() { return copyToHost(DefaultHostMemoryAllocator.get()); } + /** + * Copy the data to the host asynchronously. The caller MUST synchronize on the stream + * before examining the result. + */ + public HostColumnVector copyToHostAsync(Cuda.Stream stream) { + return copyToHostAsync(stream, DefaultHostMemoryAllocator.get()); + } + /** * Calculate the total space required to copy the data to the host. This should be padded to * the alignment that the CPU requires. diff --git a/java/src/main/java/ai/rapids/cudf/HostColumnVector.java b/java/src/main/java/ai/rapids/cudf/HostColumnVector.java index 6b41d10fee3..61b11673957 100644 --- a/java/src/main/java/ai/rapids/cudf/HostColumnVector.java +++ b/java/src/main/java/ai/rapids/cudf/HostColumnVector.java @@ -92,6 +92,8 @@ public interface EventHandler { public HostColumnVector(DType type, long rows, Optional nullCount, HostMemoryBuffer hostDataBuffer, HostMemoryBuffer hostValidityBuffer, HostMemoryBuffer offsetBuffer, List nestedHcv) { + // NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be + // asynchronously written by the device. super(type, rows, nullCount, hostDataBuffer, hostValidityBuffer, offsetBuffer, nestedHcv); refCount = 0; incRefCountInternal(true); @@ -100,6 +102,8 @@ public HostColumnVector(DType type, long rows, Optional nullCount, HostColumnVector(DType type, long rows, Optional nullCount, HostMemoryBuffer hostDataBuffer, HostMemoryBuffer hostValidityBuffer, HostMemoryBuffer offsetBuffer) { + // NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be + // asynchronously written by the device. super(type, rows, nullCount, hostDataBuffer, hostValidityBuffer, offsetBuffer, new ArrayList<>()); assert !type.equals(DType.LIST) : "This constructor should not be used for list type"; if (nullCount.isPresent() && nullCount.get() > 0 && hostValidityBuffer == null) { diff --git a/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java b/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java index 95d209c0984..a225fbf34e1 100644 --- a/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java +++ b/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,8 @@ public class HostColumnVectorCore implements AutoCloseable { public HostColumnVectorCore(DType type, long rows, Optional nullCount, HostMemoryBuffer data, HostMemoryBuffer validity, HostMemoryBuffer offsets, List nestedChildren) { + // NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be + // asynchronously written by the device. this.offHeap = new OffHeapState(data, validity, offsets); MemoryCleaner.register(this, offHeap); this.type = type; diff --git a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java index 666a8864003..89f363d2b29 100644 --- a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java +++ b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -907,8 +907,9 @@ private static ColumnBufferProvider[] providersFrom(ColumnVector[] columns) { boolean success = false; try { for (int i = 0; i < columns.length; i++) { - onHost[i] = columns[i].copyToHost(); + onHost[i] = columns[i].copyToHostAsync(Cuda.DEFAULT_STREAM); } + Cuda.DEFAULT_STREAM.sync(); ColumnBufferProvider[] ret = providersFrom(onHost, true); success = true; return ret;