Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java cleaner synchronization [skip ci] #7474

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/ColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ public String toString() {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long address = 0;

Expand Down
8 changes: 4 additions & 4 deletions java/src/main/java/ai/rapids/cudf/Cuda.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static class StreamCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = stream;
if (stream != CUDA_STREAM_DEFAULT &&
Expand Down Expand Up @@ -114,7 +114,7 @@ public String toString() {
}

@Override
public void close() {
public synchronized void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to synchronize closing a stream, but none of the other classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the other classes synchronized on close, except Stream and Event. As of this PR, these don't synchronize:

  • HostColumnVectorCore (as far as as I can tell its close is overwritten by subclasses)
  • nvcomp: BatchedMetadata and Metadata. Neither of these are called in a multi-threaded context.

I think we can synchronize close for the nvcomp ones, that seems like it could be a miss later. If you think this is unnecessary let me know and I can revert parts of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • HostColumnVectorCore (as far as as I can tell its close is overwritten by subclasses)

Child columns are instances of this class. So we should probably add synchronized to the close in HostColumnVectorCore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 did you want me to make a change around this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok done. I need to re-run my tests after upmerging. FYI.

if (cleaner != null) {
cleaner.delRef();
}
Expand All @@ -139,7 +139,7 @@ private static class EventCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = event;
if (event != 0) {
Expand Down Expand Up @@ -233,7 +233,7 @@ public String toString() {
}

@Override
public void close() {
public synchronized void close() {
cleaner.delRef();
if (closed) {
cleaner.logRefCountDebug("double free " + this);
Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/ai/rapids/cudf/DeviceMemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static final class DeviceBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = address;
if (address != 0) {
Expand Down Expand Up @@ -79,7 +79,7 @@ private static final class RmmDeviceBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
if (rmmBufferAddress != 0) {
Rmm.freeDeviceBuffer(rmmBufferAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ protected static final class OffHeapState extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
if (data != null || valid != null || offsets != null) {
try {
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/HostMemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static final class HostBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = address;
if (address != 0) {
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/MemoryBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static final class SlicedBufferCleaner extends MemoryBufferCleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
if (parent != null) {
if (logErrorIfNotClean) {
log.error("A SLICED BUFFER WAS LEAKED(ID: " + id + " parent: " + parent + ")");
Expand Down
12 changes: 9 additions & 3 deletions java/src/main/java/ai/rapids/cudf/MemoryCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,25 @@ public Cleaner() {

public final void addRef() {
if (REF_COUNT_DEBUG && refCountDebug != null) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("INC"));
synchronized(this) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("INC"));
}
}
}

public final void delRef() {
if (REF_COUNT_DEBUG && refCountDebug != null) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("DEC"));
synchronized(this) {
refCountDebug.add(new MemoryCleaner.RefCountDebugItem("DEC"));
}
}
}

public final void logRefCountDebug(String message) {
if (REF_COUNT_DEBUG && refCountDebug != null) {
log.error("{} (ID: {}): {}", message, id, MemoryCleaner.stringJoin("\n", refCountDebug));
synchronized(this) {
log.error("{} (ID: {}): {}", message, id, MemoryCleaner.stringJoin("\n", refCountDebug));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/PinnedMemoryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private static final class PinnedHostBufferCleaner extends MemoryBuffer.MemoryBu
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long origAddress = 0;
if (section != null) {
Expand Down
2 changes: 1 addition & 1 deletion java/src/main/java/ai/rapids/cudf/Scalar.java
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ private static class OffHeapState extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
if (scalarHandle != 0) {
if (logErrorIfNotClean) {
LOG.error("A SCALAR WAS LEAKED(ID: " + id + " " + Long.toHexString(scalarHandle) + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private static class BatchedMetadataCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long address = metadata;
if (metadata != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private static class MetadataCleaner extends MemoryCleaner.Cleaner {
}

@Override
protected boolean cleanImpl(boolean logErrorIfNotClean) {
protected synchronized boolean cleanImpl(boolean logErrorIfNotClean) {
boolean neededCleanup = false;
long address = metadata;
if (metadata != 0) {
Expand Down