Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use addressed-ordered first fit for the pinned memory pool #9989

Merged
merged 4 commits into from
Jan 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 45 additions & 38 deletions java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,9 @@
import java.util.Comparator;
jlowe marked this conversation as resolved.
Show resolved Hide resolved
import java.util.Iterator;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -43,19 +45,21 @@ public final class PinnedMemoryPool implements AutoCloseable {
private static Future<PinnedMemoryPool> initFuture = null;

private final long pinnedPoolBase;
private final PriorityQueue<MemorySection> freeHeap = new PriorityQueue<>(new SortedBySize());
private final SortedSet<MemorySection> freeHeap = new TreeSet<>(new SortedByAddress());
private int numAllocatedSections = 0;
private long availableBytes;

private static class SortedBySize implements Comparator<MemorySection> {
@Override
public int compare(MemorySection s0, MemorySection s1) {
// We want the largest ones first...
int ret = Long.compare(s1.size, s0.size);
if (ret == 0) {
ret = Long.compare(s0.baseAddress, s1.baseAddress);
}
return ret;
return Long.compare(s0.size, s1.size);
}
}

private static class SortedByAddress implements Comparator<MemorySection> {
@Override
public int compare(MemorySection s0, MemorySection s1) {
return Long.compare(s0.baseAddress, s1.baseAddress);
}
}

Expand Down Expand Up @@ -162,6 +166,7 @@ private static void freeInternal(MemorySection section) {

/**
* Initialize the pool.
*
* @param poolSize size of the pool to initialize.
*/
public static synchronized void initialize(long poolSize) {
Expand All @@ -170,8 +175,9 @@ public static synchronized void initialize(long poolSize) {

/**
* Initialize the pool.
*
* @param poolSize size of the pool to initialize.
* @param gpuId gpu id to set to get memory pool from, -1 means to use default
* @param gpuId gpu id to set to get memory pool from, -1 means to use default
*/
public static synchronized void initialize(long poolSize, int gpuId) {
if (isInitialized()) {
Expand Down Expand Up @@ -207,11 +213,12 @@ public static synchronized void shutdown() {

/**
* Factory method to create a pinned host memory buffer.
*
* @param bytes size in bytes to allocate
* @return newly created buffer or null if insufficient pinned memory
*/
public static HostMemoryBuffer tryAllocate(long bytes) {
HostMemoryBuffer result = null;
HostMemoryBuffer result = null;
PinnedMemoryPool pool = getSingleton();
if (pool != null) {
result = pool.tryAllocateInternal(bytes);
Expand All @@ -222,6 +229,7 @@ public static HostMemoryBuffer tryAllocate(long bytes) {
/**
* Factory method to create a host buffer but preferably pointing to pinned memory.
* It is not guaranteed that the returned buffer will be pointer to pinned memory.
*
* @param bytes size in bytes to allocate
* @return newly created buffer
*/
Expand All @@ -235,6 +243,7 @@ public static HostMemoryBuffer allocate(long bytes) {

/**
* Get the number of bytes free in the pinned memory pool.
*
* @return amount of free memory in bytes or 0 if the pool is not initialized
*/
public static long getAvailableBytes() {
Expand All @@ -246,7 +255,7 @@ public static long getAvailableBytes() {
}

private PinnedMemoryPool(long poolSize, int gpuId) {
if (gpuId > -1 ) {
if (gpuId > -1) {
// set the gpu device to use
Cuda.setDevice(gpuId);
Cuda.freeZero();
Expand All @@ -269,20 +278,28 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) {
}
// Align the allocation
long alignedBytes = ((bytes + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT;
MemorySection largest = freeHeap.peek();
if (largest.size < alignedBytes) {
log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest.size);
Optional<MemorySection> firstFit = freeHeap.stream()
.filter(section -> section.size >= alignedBytes)
.findFirst();
if (!firstFit.isPresent()) {
if (log.isDebugEnabled()) {
MemorySection largest = freeHeap.stream()
.max(new SortedBySize())
.orElse(new MemorySection(0, 0));
log.debug("Insufficient pinned memory. {} needed, {} found", alignedBytes, largest.size);
}
return null;
}
MemorySection first = firstFit.get();
log.debug("Allocating {}/{} bytes pinned from {} FREE COUNT {} OUTSTANDING COUNT {}",
bytes, alignedBytes, largest, freeHeap.size(), numAllocatedSections);
freeHeap.remove(largest);
bytes, alignedBytes, first, freeHeap.size(), numAllocatedSections);
freeHeap.remove(first);
MemorySection allocated;
if (largest.size == alignedBytes) {
allocated = largest;
if (first.size == alignedBytes) {
allocated = first;
} else {
allocated = largest.splitOff(alignedBytes);
freeHeap.add(largest);
allocated = first.splitOff(alignedBytes);
freeHeap.add(first);
}
numAllocatedSections++;
availableBytes -= allocated.size;
Expand All @@ -293,25 +310,15 @@ private synchronized HostMemoryBuffer tryAllocateInternal(long bytes) {

private synchronized void free(MemorySection section) {
log.debug("Freeing {} with {} outstanding {}", section, freeHeap, numAllocatedSections);
// This looks inefficient, but in reality it will only walk through the heap about 2 times.
// Because we keep entries up to date, each new entry will at most combine with one above it
// and one below it. That will happen in a single pass through the heap. We do a second pass
// simply out of an abundance of caution.
// Adding it in will be a log(N) operation because it is a heap.
availableBytes += section.size;
boolean anyReplaced;
do {
anyReplaced = false;
Iterator<MemorySection> it = freeHeap.iterator();
while(it.hasNext()) {
MemorySection current = it.next();
if (section.canCombine(current)) {
it.remove();
anyReplaced = true;
section.combineWith(current);
}
Iterator<MemorySection> it = freeHeap.iterator();
while(it.hasNext()) {
MemorySection current = it.next();
if (section.canCombine(current)) {
it.remove();
section.combineWith(current);
}
} while(anyReplaced);
}
freeHeap.add(section);
numAllocatedSections--;
log.debug("After freeing {} outstanding {}", freeHeap, numAllocatedSections);
Expand Down