diff --git a/java/src/main/java/ai/rapids/cudf/HostMemoryReservation.java b/java/src/main/java/ai/rapids/cudf/HostMemoryReservation.java new file mode 100644 index 00000000000..72c2e659372 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/HostMemoryReservation.java @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ai.rapids.cudf; + +/** + * Represents some amount of host memory that has been reserved. A reservation guarantees that one + * or more allocations up to the reserved amount, minus padding for alignment will succeed. A + * reservation typically guarantees the amount can be allocated one, meaning when a buffer + * allocated from a reservation is freed it is not returned to the reservation, but to the pool of + * memory the reservation originally came from. If more memory is allocated from the reservation + * an OutOfMemoryError may be thrown, but it is not guaranteed to happen. + * + * When the reservation is closed any unused reservation will be returned to the pool of memory + * the reservation came from. + */ +public interface HostMemoryReservation extends HostMemoryAllocator, AutoCloseable {} diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 969946a9533..9ce72ba237e 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,13 +37,14 @@ */ public final class PinnedMemoryPool implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(PinnedMemoryPool.class); - private static final long ALIGNMENT = 8; + private static final long ALIGNMENT = ColumnView.hostPaddingSizeInBytes(); // These static fields should only ever be accessed when class-synchronized. // Do NOT use singleton_ directly! Use the getSingleton accessor instead. private static volatile PinnedMemoryPool singleton_ = null; private static Future initFuture = null; + private final long totalPoolSize; private final long pinnedPoolBase; private final SortedSet freeHeap = new TreeSet<>(new SortedByAddress()); private int numAllocatedSections = 0; @@ -164,6 +165,14 @@ private static void freeInternal(MemorySection section) { Objects.requireNonNull(getSingleton()).free(section); } + /** + * Used to indicate that memory was allocated from a reservation. This primarily is for + * keeping track of outstanding allocations. + */ + private static void reserveAllocInternal(MemorySection section) { + Objects.requireNonNull(getSingleton()).reserveAllocHappened(section); + } + /** * Initialize the pool. * @@ -226,6 +235,21 @@ public static HostMemoryBuffer tryAllocate(long bytes) { return result; } + /** + * Factory method to create a pinned host memory reservation. + * + * @param bytes size in bytes to reserve + * @return newly created reservation or null if insufficient pinned memory to cover it. + */ + public static HostMemoryReservation tryReserve(long bytes) { + HostMemoryReservation result = null; + PinnedMemoryPool pool = getSingleton(); + if (pool != null) { + result = pool.tryReserveInternal(bytes); + } + return result; + } + /** * Factory method to create a host buffer but preferably pointing to pinned memory. * It is not guaranteed that the returned buffer will be pointer to pinned memory. @@ -233,7 +257,7 @@ public static HostMemoryBuffer tryAllocate(long bytes) { * @param bytes size in bytes to allocate * @return newly created buffer */ - public static HostMemoryBuffer allocate(long bytes, HostMemoryAllocator hostMemoryAllocator) { + public static HostMemoryBuffer allocate(long bytes, HostMemoryAllocator hostMemoryAllocator) { HostMemoryBuffer result = tryAllocate(bytes); if (result == null) { result = hostMemoryAllocator.allocate(bytes, false); @@ -241,6 +265,13 @@ public static HostMemoryBuffer allocate(long bytes, HostMemoryAllocator hostMem return result; } + /** + * Factory method to create a host buffer but preferably pointing to pinned memory. + * It is not guaranteed that the returned buffer will be pointer to pinned memory. + * + * @param bytes size in bytes to allocate + * @return newly created buffer + */ public static HostMemoryBuffer allocate(long bytes) { return allocate(bytes, DefaultHostMemoryAllocator.get()); } @@ -258,12 +289,24 @@ public static long getAvailableBytes() { return 0; } + /** + * Get the number of bytes that the pinned memory pool was allocated with. + */ + public static long getTotalPoolSizeBytes() { + PinnedMemoryPool pool = getSingleton(); + if (pool != null) { + return pool.getTotalPoolSizeInternal(); + } + return 0; + } + private PinnedMemoryPool(long poolSize, int gpuId) { if (gpuId > -1) { // set the gpu device to use Cuda.setDevice(gpuId); Cuda.freeZero(); } + this.totalPoolSize = poolSize; this.pinnedPoolBase = Cuda.hostAllocPinned(poolSize); freeHeap.add(new MemorySection(pinnedPoolBase, poolSize)); this.availableBytes = poolSize; @@ -271,32 +314,42 @@ private PinnedMemoryPool(long poolSize, int gpuId) { @Override public void close() { - assert numAllocatedSections == 0; + assert numAllocatedSections == 0 : "Leaked " + numAllocatedSections + " pinned allocations"; Cuda.freePinned(pinnedPoolBase); } - private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { + /** + * Pads a length of bytes to the alignment the CPU wants in the worst case. This helps to + * calculate the size needed for a reservation if there are multiple buffers. + * @param bytes the size in bytes + * @return the new padded size in bytes. + */ + public static long padToCpuAlignment(long bytes) { + return ((bytes + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT; + } + + private synchronized MemorySection tryGetInternal(long bytes, String what) { if (freeHeap.isEmpty()) { log.debug("No free pinned memory left"); return null; } // Align the allocation - long alignedBytes = ((bytes + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT; + long alignedBytes = padToCpuAlignment(bytes); Optional firstFit = freeHeap.stream() - .filter(section -> section.size >= alignedBytes) - .findFirst(); + .filter(section -> section.size >= alignedBytes) + .findFirst(); if (!firstFit.isPresent()) { if (log.isDebugEnabled()) { MemorySection largest = freeHeap.stream() - .max(new SortedBySize()) - .orElse(new MemorySection(0, 0)); + .max(new SortedBySize()) + .orElse(new MemorySection(0, 0)); log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest.size); } return null; } MemorySection first = firstFit.get(); - log.debug("Allocating {}/{} bytes pinned from {} FREE COUNT {} OUTSTANDING COUNT {}", - bytes, alignedBytes, first, freeHeap.size(), numAllocatedSections); + log.debug("{} {}/{} bytes pinned from {} FREE COUNT {} OUTSTANDING COUNT {}", + what, bytes, alignedBytes, first, freeHeap.size(), numAllocatedSections); freeHeap.remove(first); MemorySection allocated; if (first.size == alignedBytes) { @@ -307,9 +360,74 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { } numAllocatedSections++; availableBytes -= allocated.size; - log.debug("Allocated {} free {} outstanding {}", allocated, freeHeap, numAllocatedSections); - return new HostMemoryBuffer(allocated.baseAddress, bytes, - new PinnedHostBufferCleaner(allocated, bytes)); + log.debug("{} {} free {} outstanding {}", what, allocated, freeHeap, numAllocatedSections); + return allocated; + } + + private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { + MemorySection allocated = tryGetInternal(bytes, "allocate"); + if (allocated == null) { + return null; + } else { + return new HostMemoryBuffer(allocated.baseAddress, bytes, + new PinnedHostBufferCleaner(allocated, bytes)); + } + } + + private class PinnedReservation implements HostMemoryReservation { + private MemorySection section = null; + + public PinnedReservation(MemorySection section) { + this.section = section; + } + + @Override + public synchronized HostMemoryBuffer allocate(long bytes, boolean preferPinned) { + return this.allocate(bytes); + } + + @Override + public synchronized HostMemoryBuffer allocate(long bytes) { + if (section == null || section.size < bytes) { + throw new OutOfMemoryError("Reservation didn't have enough space " + bytes + " / " + + (section == null ? 0 : section.size)); + } + long alignedSize = padToCpuAlignment(bytes); + MemorySection allocated; + if (section.size >= bytes && section.size <= alignedSize) { + allocated = section; + section = null; + // No need for reserveAllocInternal because the original section is already tracked + } else { + allocated = section.splitOff(alignedSize); + PinnedMemoryPool.reserveAllocInternal(allocated); + } + return new HostMemoryBuffer(allocated.baseAddress, bytes, + new PinnedHostBufferCleaner(allocated, bytes)); + } + + @Override + public synchronized void close() throws Exception { + if (section != null) { + try { + PinnedMemoryPool.freeInternal(section); + } finally { + // Always mark the resource as freed even if an exception is thrown. + // We cannot know how far it progressed before the exception, and + // therefore it is unsafe to retry. + section = null; + } + } + } + } + + private HostMemoryReservation tryReserveInternal(long bytes) { + MemorySection allocated = tryGetInternal(bytes, "allocate"); + if (allocated == null) { + return null; + } else { + return new PinnedReservation(allocated); + } } private synchronized void free(MemorySection section) { @@ -328,7 +446,17 @@ private synchronized void free(MemorySection section) { log.debug("After freeing {} outstanding {}", freeHeap, numAllocatedSections); } + private synchronized void reserveAllocHappened(MemorySection section) { + if (section != null && section.size > 0) { + numAllocatedSections++; + } + } + private synchronized long getAvailableBytesInternal() { return this.availableBytes; } + + private long getTotalPoolSizeInternal() { + return this.totalPoolSize; + } }