diff --git a/java/src/main/java/ai/rapids/cudf/ColumnView.java b/java/src/main/java/ai/rapids/cudf/ColumnView.java index 997ff77bae3..8ff2f0f0a73 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnView.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnView.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -5034,8 +5034,8 @@ private static NestedColumnVector createNestedColumnVector(DType type, long rows // DATA MOVEMENT ///////////////////////////////////////////////////////////////////////////// - private static HostColumnVectorCore copyToHostNestedHelper( - ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) { + private static HostColumnVectorCore copyToHostAsyncNestedHelper( + Cuda.Stream stream, ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) { if (deviceCvPointer == null) { return null; } @@ -5056,20 +5056,20 @@ private static HostColumnVectorCore copyToHostNestedHelper( currValidity = deviceCvPointer.getValid(); if (currData != null) { hostData = hostMemoryAllocator.allocate(currData.length); - hostData.copyFromDeviceBuffer(currData); + hostData.copyFromDeviceBufferAsync(currData, stream); } if (currValidity != null) { hostValid = hostMemoryAllocator.allocate(currValidity.length); - hostValid.copyFromDeviceBuffer(currValidity); + hostValid.copyFromDeviceBufferAsync(currValidity, stream); } if (currOffsets != null) { hostOffsets = hostMemoryAllocator.allocate(currOffsets.length); - hostOffsets.copyFromDeviceBuffer(currOffsets); + hostOffsets.copyFromDeviceBufferAsync(currOffsets, stream); } int numChildren = deviceCvPointer.getNumChildren(); for (int i = 0; i < numChildren; i++) { try(ColumnView childDevPtr = deviceCvPointer.getChildColumnView(i)) { - children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator)); + children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator)); } } currNullCount = deviceCvPointer.getNullCount(); @@ -5103,11 +5103,20 @@ private static HostColumnVectorCore copyToHostNestedHelper( } } + /** Copy the data to the host synchronously. */ + public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { + HostColumnVector result = copyToHostAsync(Cuda.DEFAULT_STREAM, hostMemoryAllocator); + Cuda.DEFAULT_STREAM.sync(); + return result; + } + /** - * Copy the data to the host. + * Copy the data to the host asynchronously. The caller MUST synchronize on the stream + * before examining the result. */ - public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { - try (NvtxRange toHost = new NvtxRange("ensureOnHost", NvtxColor.BLUE)) { + public HostColumnVector copyToHostAsync(Cuda.Stream stream, + HostMemoryAllocator hostMemoryAllocator) { + try (NvtxRange toHost = new NvtxRange("toHostAsync", NvtxColor.BLUE)) { HostMemoryBuffer hostDataBuffer = null; HostMemoryBuffer hostValidityBuffer = null; HostMemoryBuffer hostOffsetsBuffer = null; @@ -5127,16 +5136,16 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { if (!type.isNestedType()) { if (valid != null) { hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength()); - hostValidityBuffer.copyFromDeviceBuffer(valid); + hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream); } if (offsets != null) { hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.length); - hostOffsetsBuffer.copyFromDeviceBuffer(offsets); + hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream); } // If a strings column is all null values there is no data buffer allocated if (data != null) { hostDataBuffer = hostMemoryAllocator.allocate(data.length); - hostDataBuffer.copyFromDeviceBuffer(data); + hostDataBuffer.copyFromDeviceBufferAsync(data, stream); } HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount), hostDataBuffer, hostValidityBuffer, hostOffsetsBuffer); @@ -5145,21 +5154,21 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { } else { if (data != null) { hostDataBuffer = hostMemoryAllocator.allocate(data.length); - hostDataBuffer.copyFromDeviceBuffer(data); + hostDataBuffer.copyFromDeviceBufferAsync(data, stream); } if (valid != null) { hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength()); - hostValidityBuffer.copyFromDeviceBuffer(valid); + hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream); } if (offsets != null) { hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.getLength()); - hostOffsetsBuffer.copyFromDeviceBuffer(offsets); + hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream); } List children = new ArrayList<>(); for (int i = 0; i < getNumChildren(); i++) { try (ColumnView childDevPtr = getChildColumnView(i)) { - children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator)); + children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator)); } } HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount), @@ -5192,10 +5201,19 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) { } } + /** Copy the data to host memory synchronously */ public HostColumnVector copyToHost() { return copyToHost(DefaultHostMemoryAllocator.get()); } + /** + * Copy the data to the host asynchronously. The caller MUST synchronize on the stream + * before examining the result. + */ + public HostColumnVector copyToHostAsync(Cuda.Stream stream) { + return copyToHostAsync(stream, DefaultHostMemoryAllocator.get()); + } + /** * Calculate the total space required to copy the data to the host. This should be padded to * the alignment that the CPU requires. diff --git a/java/src/main/java/ai/rapids/cudf/HostColumnVector.java b/java/src/main/java/ai/rapids/cudf/HostColumnVector.java index 6b41d10fee3..61b11673957 100644 --- a/java/src/main/java/ai/rapids/cudf/HostColumnVector.java +++ b/java/src/main/java/ai/rapids/cudf/HostColumnVector.java @@ -92,6 +92,8 @@ public interface EventHandler { public HostColumnVector(DType type, long rows, Optional nullCount, HostMemoryBuffer hostDataBuffer, HostMemoryBuffer hostValidityBuffer, HostMemoryBuffer offsetBuffer, List nestedHcv) { + // NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be + // asynchronously written by the device. super(type, rows, nullCount, hostDataBuffer, hostValidityBuffer, offsetBuffer, nestedHcv); refCount = 0; incRefCountInternal(true); @@ -100,6 +102,8 @@ public HostColumnVector(DType type, long rows, Optional nullCount, HostColumnVector(DType type, long rows, Optional nullCount, HostMemoryBuffer hostDataBuffer, HostMemoryBuffer hostValidityBuffer, HostMemoryBuffer offsetBuffer) { + // NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be + // asynchronously written by the device. super(type, rows, nullCount, hostDataBuffer, hostValidityBuffer, offsetBuffer, new ArrayList<>()); assert !type.equals(DType.LIST) : "This constructor should not be used for list type"; if (nullCount.isPresent() && nullCount.get() > 0 && hostValidityBuffer == null) { diff --git a/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java b/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java index 95d209c0984..a225fbf34e1 100644 --- a/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java +++ b/java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,8 @@ public class HostColumnVectorCore implements AutoCloseable { public HostColumnVectorCore(DType type, long rows, Optional nullCount, HostMemoryBuffer data, HostMemoryBuffer validity, HostMemoryBuffer offsets, List nestedChildren) { + // NOTE: This constructor MUST NOT examine the contents of any host buffers, as they may be + // asynchronously written by the device. this.offHeap = new OffHeapState(data, validity, offsets); MemoryCleaner.register(this, offHeap); this.type = type; diff --git a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java index 666a8864003..89f363d2b29 100644 --- a/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java +++ b/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -907,8 +907,9 @@ private static ColumnBufferProvider[] providersFrom(ColumnVector[] columns) { boolean success = false; try { for (int i = 0; i < columns.length; i++) { - onHost[i] = columns[i].copyToHost(); + onHost[i] = columns[i].copyToHostAsync(Cuda.DEFAULT_STREAM); } + Cuda.DEFAULT_STREAM.sync(); ColumnBufferProvider[] ret = providersFrom(onHost, true); success = true; return ret;