From 661694351eb9eedea6cabdc3b1199daf4cfa9e6d Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 6 Jan 2022 11:03:03 -0800 Subject: [PATCH 1/3] addressed order first fit for pinned memory pool --- .../java/ai/rapids/cudf/PinnedMemoryPool.java | 77 +++++++++++-------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 865a668156f..16ec186f4d4 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -25,7 +25,9 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Objects; -import java.util.PriorityQueue; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -43,15 +45,14 @@ public final class PinnedMemoryPool implements AutoCloseable { private static Future initFuture = null; private final long pinnedPoolBase; - private final PriorityQueue freeHeap = new PriorityQueue<>(new SortedBySize()); + private final SortedSet freeHeap = new TreeSet<>(new SortedByAddress()); private int numAllocatedSections = 0; private long availableBytes; private static class SortedBySize implements Comparator { @Override public int compare(MemorySection s0, MemorySection s1) { - // We want the largest ones first... - int ret = Long.compare(s1.size, s0.size); + int ret = Long.compare(s0.size, s1.size); if (ret == 0) { ret = Long.compare(s0.baseAddress, s1.baseAddress); } @@ -59,6 +60,13 @@ public int compare(MemorySection s0, MemorySection s1) { } } + private static class SortedByAddress implements Comparator { + @Override + public int compare(MemorySection s0, MemorySection s1) { + return Long.compare(s0.baseAddress, s1.baseAddress); + } + } + private static class MemorySection { private long baseAddress; private long size; @@ -162,6 +170,7 @@ private static void freeInternal(MemorySection section) { /** * Initialize the pool. + * * @param poolSize size of the pool to initialize. */ public static synchronized void initialize(long poolSize) { @@ -170,8 +179,9 @@ public static synchronized void initialize(long poolSize) { /** * Initialize the pool. + * * @param poolSize size of the pool to initialize. - * @param gpuId gpu id to set to get memory pool from, -1 means to use default + * @param gpuId gpu id to set to get memory pool from, -1 means to use default */ public static synchronized void initialize(long poolSize, int gpuId) { if (isInitialized()) { @@ -207,11 +217,12 @@ public static synchronized void shutdown() { /** * Factory method to create a pinned host memory buffer. + * * @param bytes size in bytes to allocate * @return newly created buffer or null if insufficient pinned memory */ public static HostMemoryBuffer tryAllocate(long bytes) { - HostMemoryBuffer result = null; + HostMemoryBuffer result = null; PinnedMemoryPool pool = getSingleton(); if (pool != null) { result = pool.tryAllocateInternal(bytes); @@ -222,6 +233,7 @@ public static HostMemoryBuffer tryAllocate(long bytes) { /** * Factory method to create a host buffer but preferably pointing to pinned memory. * It is not guaranteed that the returned buffer will be pointer to pinned memory. + * * @param bytes size in bytes to allocate * @return newly created buffer */ @@ -235,6 +247,7 @@ public static HostMemoryBuffer allocate(long bytes) { /** * Get the number of bytes free in the pinned memory pool. + * * @return amount of free memory in bytes or 0 if the pool is not initialized */ public static long getAvailableBytes() { @@ -246,7 +259,7 @@ public static long getAvailableBytes() { } private PinnedMemoryPool(long poolSize, int gpuId) { - if (gpuId > -1 ) { + if (gpuId > -1) { // set the gpu device to use Cuda.setDevice(gpuId); Cuda.freeZero(); @@ -269,20 +282,29 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { } // Align the allocation long alignedBytes = ((bytes + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT; - MemorySection largest = freeHeap.peek(); - if (largest.size < alignedBytes) { - log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest.size); + Optional firstFit = freeHeap.stream() + .filter(section -> section.size >= alignedBytes) + .findFirst(); + if (!firstFit.isPresent()) { + if (log.isDebugEnabled()) { + long largest = freeHeap.stream() + .max(new SortedBySize()) + .orElse(new MemorySection(0, 0)) + .size; + log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest); + } return null; } + MemorySection first = firstFit.get(); log.debug("Allocating {}/{} bytes pinned from {} FREE COUNT {} OUTSTANDING COUNT {}", - bytes, alignedBytes, largest, freeHeap.size(), numAllocatedSections); - freeHeap.remove(largest); + bytes, alignedBytes, first, freeHeap.size(), numAllocatedSections); + freeHeap.remove(first); MemorySection allocated; - if (largest.size == alignedBytes) { - allocated = largest; + if (first.size == alignedBytes) { + allocated = first; } else { - allocated = largest.splitOff(alignedBytes); - freeHeap.add(largest); + allocated = first.splitOff(alignedBytes); + freeHeap.add(first); } numAllocatedSections++; availableBytes -= allocated.size; @@ -293,25 +315,14 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { private synchronized void free(MemorySection section) { log.debug("Freeing {} with {} outstanding {}", section, freeHeap, numAllocatedSections); - // This looks inefficient, but in reality it will only walk through the heap about 2 times. - // Because we keep entries up to date, each new entry will at most combine with one above it - // and one below it. That will happen in a single pass through the heap. We do a second pass - // simply out of an abundance of caution. - // Adding it in will be a log(N) operation because it is a heap. availableBytes += section.size; - boolean anyReplaced; - do { - anyReplaced = false; - Iterator it = freeHeap.iterator(); - while(it.hasNext()) { - MemorySection current = it.next(); - if (section.canCombine(current)) { - it.remove(); - anyReplaced = true; - section.combineWith(current); - } + for (Iterator it = freeHeap.iterator(); it.hasNext(); ) { + MemorySection current = it.next(); + if (section.canCombine(current)) { + it.remove(); + section.combineWith(current); } - } while(anyReplaced); + } freeHeap.add(section); numAllocatedSections--; log.debug("After freeing {} outstanding {}", freeHeap, numAllocatedSections); From b2b62c0127be7f5bc08812b853d21cec84ab097a Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Thu, 6 Jan 2022 11:49:54 -0800 Subject: [PATCH 2/3] minor cleanup --- .../java/ai/rapids/cudf/PinnedMemoryPool.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 16ec186f4d4..3b9665b6e97 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -52,11 +52,7 @@ public final class PinnedMemoryPool implements AutoCloseable { private static class SortedBySize implements Comparator { @Override public int compare(MemorySection s0, MemorySection s1) { - int ret = Long.compare(s0.size, s1.size); - if (ret == 0) { - ret = Long.compare(s0.baseAddress, s1.baseAddress); - } - return ret; + return Long.compare(s0.size, s1.size); } } @@ -287,11 +283,10 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { .findFirst(); if (!firstFit.isPresent()) { if (log.isDebugEnabled()) { - long largest = freeHeap.stream() + MemorySection largest = freeHeap.stream() .max(new SortedBySize()) - .orElse(new MemorySection(0, 0)) - .size; - log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest); + .orElse(new MemorySection(0, 0)); + log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest.size); } return null; } @@ -316,7 +311,8 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) { private synchronized void free(MemorySection section) { log.debug("Freeing {} with {} outstanding {}", section, freeHeap, numAllocatedSections); availableBytes += section.size; - for (Iterator it = freeHeap.iterator(); it.hasNext(); ) { + Iterator it = freeHeap.iterator(); + while(it.hasNext()) { MemorySection current = it.next(); if (section.canCombine(current)) { it.remove(); From cf37f1fac02d17667a4f4b87b9058d3031846896 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Fri, 7 Jan 2022 09:42:28 -0800 Subject: [PATCH 3/3] update copyright year --- java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java index 3b9665b6e97..6eee935748e 100644 --- a/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java +++ b/java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.