Skip to content

Commit

Permalink
Cleanup local channel memory, fixes #798
Browse files Browse the repository at this point in the history
  • Loading branch information
diego.salvi committed Jan 16, 2023
1 parent aa7e077 commit 7df23ad
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -186,17 +187,17 @@ public void messageSent(Throwable error) {
});
}

private volatile boolean closed = false;
private AtomicBoolean closed = new AtomicBoolean(false);

protected abstract String describeSocket();
protected abstract void doClose();

@Override
public final void close() {
if (closed) {
if (!closed.compareAndSet(false,true)) {
// Already closed or in closing procedure
return;
}
closed = true;
LOGGER.log(Level.FINE, "{0}: closing", this);
String socketDescription = describeSocket();
doClose();
Expand All @@ -205,7 +206,7 @@ public final void close() {

@Override
public final boolean isClosed() {
return closed;
return closed.get();
}
private void failPendingMessages(String socketDescription) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
public class LocalVMChannel extends AbstractChannel implements Comparable<LocalVMChannel> {

private final ServerSideLocalVMChannel serverSideChannel;
private final LocalVMChannelAcceptor parent;

LocalVMChannel(String name, ChannelEventListener clientSidePeer, ExecutorService executorService) {
LocalVMChannel(String name, ChannelEventListener clientSidePeer, ExecutorService executorService, LocalVMChannelAcceptor parent) {
super(name, ADDRESS_JVM_LOCAL, executorService);
setMessagesReceiver(clientSidePeer);
serverSideChannel = new ServerSideLocalVMChannel(ADDRESS_JVM_LOCAL, ADDRESS_JVM_LOCAL, executorService);
this.parent = parent;
}

public Channel getServerSideChannel() {
Expand Down Expand Up @@ -75,6 +77,7 @@ protected String describeSocket() {

@Override
protected void doClose() {
parent.deregister(this);
// emulate Netty on channel close
channelClosed();
serverSideChannel.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,19 @@ public void close() {
}

public Channel connect(String name, ChannelEventListener clientSidePeer, ExecutorService executorService) {
LocalVMChannel channel = new LocalVMChannel(name, clientSidePeer, executorService);
LocalVMChannel channel = new LocalVMChannel(name, clientSidePeer, executorService, this);
acceptor.createConnection(channel.getServerSideChannel());
channels.add(channel);
return channel;
}

void deregister(LocalVMChannel channel) {
channels.remove(channel);
}

// Visible for testing
Set<LocalVMChannel> channels() {
return channels;
}

}
43 changes: 33 additions & 10 deletions herddb-net/src/test/java/herddb/network/netty/LocalChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import herddb.proto.Pdu;
import herddb.utils.TestUtils;
import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -66,25 +67,47 @@ public void channelClosed(Channel channel) {
});
acceptor.start();
assertNotNull(LocalServerRegistry.getLocalServer(NetworkUtils.getAddress(addr), addr.getPort()));

ExecutorService executor = Executors.newCachedThreadPool();
try (Channel client = NettyConnector.connect(addr.getHostName(), addr.getPort(), true, 0, 0, new ChannelEventListener() {
try {
try (Channel client = NettyConnector.connect(addr.getHostName(), addr.getPort(), true, 0, 0, new ChannelEventListener() {

@Override
public void channelClosed(Channel channel) {
System.out.println("client channelClosed");
@Override
public void channelClosed(Channel channel) {
System.out.println("client channelClosed");

}
}, executor, null)) {
for (int i = 0; i < 100; i++) {
ByteBuf buffer = buildAckRequest(i);
try (Pdu result = client.sendMessageWithPduReply(i, buffer, 10000)) {
assertEquals(Pdu.TYPE_ACK, result.type);
}
}
}
}, executor, null)) {
for (int i = 0; i < 100; i++) {
ByteBuf buffer = buildAckRequest(i);
try (Pdu result = client.sendMessageWithPduReply(i, buffer, 10000)) {
assertEquals(Pdu.TYPE_ACK, result.type);

try (Channel client = NettyConnector.connect(addr.getHostName(), addr.getPort(), true, 0, 0, new ChannelEventListener() {

@Override
public void channelClosed(Channel channel) {
System.out.println("client channelClosed");

}
}, executor, null)) {
for (int i = 0; i < 100; i++) {
ByteBuf buffer = buildAckRequest(i);
try (Pdu result = client.sendMessageWithPduReply(i, buffer, 10000)) {
assertEquals(Pdu.TYPE_ACK, result.type);
}
}
}

LocalVMChannelAcceptor localAcceptor = LocalServerRegistry.getLocalServer(NetworkUtils.getAddress(addr), addr.getPort());
assertNotNull(localAcceptor);
assertTrue(localAcceptor.channels().isEmpty());
} finally {
executor.shutdown();
}

}
assertNull(LocalServerRegistry.getLocalServer(NetworkUtils.getAddress(addr), addr.getPort()));
}
Expand Down

0 comments on commit 7df23ad

Please sign in to comment.