Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
add back press for kop server (#231)
Browse files Browse the repository at this point in the history
kop data flow works like this:

kafka client(1) -> kop/broker (2)-> bookkeeper

Right now we can use throttler to slow down (2) data flood, but (1) doesn't have any protection. This pr is used to enable/disable auto read for (1) channel. With this, we can controll the speed of channel in case of potential oom issue.
  • Loading branch information
yuanboliu authored Nov 12, 2020
1 parent f81f7e4 commit 9a4be21
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -81,6 +82,15 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (log.isDebugEnabled()) {
log.debug("Channel writability has changed to: {}", ctx.channel().isWritable());
}
if (ctx.channel().isWritable()) {
// set auto read to true if channel is writable.
ctx.channel().config().setAutoRead(true);
} else {
log.warn("channel is not writable, disable auto reading for back pressing");
ctx.channel().config().setAutoRead(false);
ctx.flush();
}
ctx.fireChannelWritabilityChanged();
}

// turn input ByteBuf msg, which send from client side, into KafkaHeaderAndRequest
Expand Down Expand Up @@ -455,4 +465,9 @@ public static ResponseAndRequest of(CompletableFuture<AbstractResponse> response
this.request = request;
}
}

@VisibleForTesting
public ChannelHandlerContext getCtx() {
return ctx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndRequest;
import io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.KafkaHeaderAndResponse;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -42,6 +48,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -107,6 +114,36 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testAutoReadEnableDisable() {
final ByteBuf buffer = Unpooled.copiedBuffer("Test", CharsetUtil.US_ASCII);
final EmbeddedChannel channel = new EmbeddedChannel(handler);
ByteBuf[] buffers = new ByteBuf[1000000];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = buffer.duplicate().retain();
}
AtomicBoolean isChannelWritable = new AtomicBoolean(true);
ExecutorService service = Executors.newSingleThreadExecutor();

service.execute(() -> {
int count = 0;
while (count < 10) {
boolean isWritable = handler.getCtx().channel().isWritable();
if (!isWritable) {
isChannelWritable.set(false);
break;
}
count += 1;
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (Exception ignored) {}

}
});
channel.writeOutbound(buffers);
Assert.assertFalse(isChannelWritable.get());
}

@Test
public void testByteBufToRequest() {
int correlationId = 7777;
Expand Down

0 comments on commit 9a4be21

Please sign in to comment.