Skip to content

Commit

Permalink
Java cleaner synchronization (#7474)
Browse files Browse the repository at this point in the history
Add synchronization in `cleanImpl` and `close` in various places where race conditions could exist, and also within the `MemoryCleaner` to address some concurrent modification issues we've seen in tests while shutting down (i.e. invoking the cleaner) (i.e. NVIDIA/spark-rapids#1797)

Authors:
  - Alessandro Bellina (@abellina)

Approvers:
  - Robert (Bobby) Evans (@revans2)
  - Jason Lowe (@jlowe)

URL: #7474
  • Loading branch information
abellina authored Mar 4, 2021
1 parent d619f77 commit 85c1f8f
Show file tree
Hide file tree
Showing 11 changed files with 26 additions and 20 deletions.
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/ColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ public String toString() {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long address = 0;

Expand Down
8 changes: 4 additions & 4 deletions java/src/main/java/ai/rapids/cudf/Cuda.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static class StreamCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = stream;
if (stream != CUDA_STREAM_DEFAULT &&
Expand Down Expand Up @@ -114,7 +114,7 @@ public String toString() {
}

@Override
public void close() {
public synchronized void close() {
if (cleaner != null) {
cleaner.delRef();
}
Expand All @@ -139,7 +139,7 @@ private static class EventCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = event;
if (event != 0) {
Expand Down Expand Up @@ -233,7 +233,7 @@ public String toString() {
}

@Override
public void close() {
public synchronized void close() {
cleaner.delRef();
if (closed) {
cleaner.logRefCountDebug("double free " + this);
Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/ai/rapids/cudf/DeviceMemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static final class DeviceBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = address;
if (address != 0) {
Expand Down Expand Up @@ -79,7 +79,7 @@ private static final class RmmDeviceBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
if (rmmBufferAddress != 0) {
Rmm.freeDeviceBuffer(rmmBufferAddress);
Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/ai/rapids/cudf/HostColumnVectorCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public long getHostMemorySize() {
* Close method for the column
*/
@Override
public void close() {
public synchronized void close() {
for (HostColumnVectorCore child : children) {
if (child != null) {
child.close();
Expand Down Expand Up @@ -553,7 +553,7 @@ protected static final class OffHeapState extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
if (data != null || valid != null || offsets != null) {
try {
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static final class HostBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = address;
if (address != 0) {
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/MemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static final class SlicedBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
if (parent != null) {
if (logErrorIfNotClean) {
log.error("A SLICED BUFFER WAS LEAKED(ID: " + id + " parent: " + parent + ")");
Expand Down
12 changes: 9 additions & 3 deletions java/src/main/java/ai/rapids/cudf/MemoryCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,25 @@ public Cleaner() {

public final void addRef() {
if (REF_COUNT_DEBUG && refCountDebug != null) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("INC"));
synchronized(this) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("INC"));
}
}
}

public final void delRef() {
if (REF_COUNT_DEBUG && refCountDebug != null) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("DEC"));
synchronized(this) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("DEC"));
}
}
}

public final void logRefCountDebug(String message) {
if (REF_COUNT_DEBUG && refCountDebug != null) {
log.error("{} (ID: {}): {}", message, id, MemoryCleaner.stringJoin("\n", refCountDebug));
synchronized(this) {
log.error("{} (ID: {}): {}", message, id, MemoryCleaner.stringJoin("\n", refCountDebug));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private static final class PinnedHostBufferCleaner extends MemoryBuffer.MemoryBu
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = 0;
if (section != null) {
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/Scalar.java
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ private static class OffHeapState extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
if (scalarHandle != 0) {
if (logErrorIfNotClean) {
LOG.error("A SCALAR WAS LEAKED(ID: " + id + " " + Long.toHexString(scalarHandle) + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public boolean isLZ4Metadata() {
}

@Override
public void close() {
public synchronized void close() {
if (!closed) {
cleaner.delRef();
cleaner.clean(false);
Expand All @@ -200,7 +200,7 @@ private static class BatchedMetadataCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long address = metadata;
if (metadata != 0) {
Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/ai/rapids/cudf/nvcomp/Decompressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public boolean isLZ4Metadata() {
}

@Override
public void close() {
public synchronized void close() {
if (!closed) {
cleaner.delRef();
cleaner.clean(false);
Expand All @@ -135,7 +135,7 @@ private static class MetadataCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long address = metadata;
if (metadata != 0) {
Expand Down

0 comments on commit 85c1f8f

Please sign in to comment.