Skip to content

Commit

Permalink
Merge #3575 into 1.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Dec 31, 2024
2 parents b8a012b + f054423 commit ebd8f36
Showing 1 changed file with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2023-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
import reactor.util.Logger;
import reactor.util.Loggers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -47,13 +49,18 @@ public final class CancelReceiverHandlerTest extends ChannelInboundHandlerAdapte
*/
final Runnable cancelAction;

/**
* List with incoming message body parts which are expected to be all released.
*/
private final List<ByteBuf> buffers;

/**
* Flag set to true when the cancel action has already been invoked.
*/
private final AtomicBoolean cancelled = new AtomicBoolean();

/**
* Latch initialized with the number of incoming message body parts which are exected to be all released.
* Latch initialized with the number of incoming message body parts which are expected to be all released.
*/
private final CountDownLatch expectedReleaseCount;

Expand All @@ -75,28 +82,43 @@ public CancelReceiverHandlerTest(Runnable cancelAction) {
* message buffers are all released.
*/
public CancelReceiverHandlerTest(Runnable cancelAction, int expectedReleaseCount) {
this.buffers = new ArrayList<>(expectedReleaseCount);
this.cancelAction = cancelAction;
this.expectedReleaseCount = new CountDownLatch(expectedReleaseCount);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// If the incoming message is a kind of a buffer, calls the cancel action.
// If the incoming message is ByteBuf/ByteBufHolder, calls the cancel action.
ByteBuf buf = (msg instanceof ByteBufHolder) ? ((ByteBufHolder) msg).content() :
((msg instanceof ByteBuf) ? (ByteBuf) msg : null);
if (buf != null && cancelled.compareAndSet(false, true)) {
log.debug("Executing cancel action");
cancelAction.run();
if (buf != null) {
if (cancelled.compareAndSet(false, true)) {
log.debug("Executing cancel action");
cancelAction.run();
}
if (!(buf instanceof EmptyByteBuf)) {
buffers.add(buf);
}
}

ctx.fireChannelRead(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
try {
ctx.fireChannelRead(msg);
ctx.fireChannelReadComplete();
}
finally {
if (buf != null && !(buf instanceof EmptyByteBuf) && buf.refCnt() == 0) {
expectedReleaseCount.countDown();
if (expectedReleaseCount.getCount() == 0) {
log.debug("All received messages have been released.");
for (int i = 0; i < buffers.size(); i++) {
ByteBuf buf = buffers.get(i);
if (buf.refCnt() == 0) {
buffers.remove(i);
expectedReleaseCount.countDown();
if (expectedReleaseCount.getCount() == 0) {
log.debug("All received messages have been released.");
}
}
}
}
Expand Down

0 comments on commit ebd8f36

Please sign in to comment.