Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Use LinkedBlockingQueue as the container of ResponseAndRequest #500

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;

import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.naming.AuthenticationException;
Expand Down Expand Up @@ -55,11 +55,8 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
protected SocketAddress remoteAddress;
@Getter
protected AtomicBoolean isActive = new AtomicBoolean(false);
// Queue to make response get responseFuture in order.
private final Queue<ResponseAndRequest> responseQueue = Queues.newConcurrentLinkedQueue();

// Queue to make request at the given capacity.
private final FakeArrayBlockingQueue requestQueue;
// Queue to make response get responseFuture in order and limit the max request size
private final LinkedBlockingQueue<ResponseAndRequest> requestQueue;

protected final RequestStats requestStats;
@Getter
Expand All @@ -68,7 +65,7 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
public KafkaCommandDecoder(StatsLogger statsLogger, KafkaServiceConfiguration kafkaConfig) {
this.requestStats = new RequestStats(statsLogger);
this.kafkaConfig = kafkaConfig;
this.requestQueue = new FakeArrayBlockingQueue(kafkaConfig.getMaxQueuedRequests());
this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests());
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -86,9 +83,19 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}

protected void close() {
// Clear the request queue
log.info("close channel {} with {} pending responses", ctx.channel(), requestQueue.size());
while (true) {
final ResponseAndRequest responseAndRequest = requestQueue.poll();
if (responseAndRequest != null) {
// Trigger writeAndFlushResponseToClient immediately, but it will do nothing because isActive is false
responseAndRequest.getResponseFuture().cancel(true);
} else {
// queue is empty
break;
}
}
ctx.close();
requestQueue.clear();
responseQueue.clear();
}

@Override
Expand Down Expand Up @@ -154,7 +161,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
remoteAddress = channel.remoteAddress();
}

requestQueue.put();
requestStats.getRequestQueueSize().incrementAndGet();

final long timeBeforeParse = MathUtils.nowInNano();
Expand All @@ -171,18 +177,29 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<>();
responseFuture.whenComplete((response, e) -> {
// Do nothing if it's triggered when responseFuture is expired
if (e != null) {
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
if (e instanceof CancellationException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Request {} is cancelled",
ctx.channel(), kafkaHeaderAndRequest.getHeader());
}
} else {
log.error("[{}] Request {} is completed with exception",
ctx.channel(), kafkaHeaderAndRequest.getHeader(), e);
}
return;
}
if (response == null) {
log.error("[{}] Unexpected null completed future for request {}",
ctx.channel(), kafkaHeaderAndRequest.getHeader());
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
return;
}
ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
// release requestQueue room
requestQueue.poll(); // throw exception wheather cause the eventLoop thread dead
});
});

responseQueue.add(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));
// potentially blocking until there is room in the queue for the request.
requestQueue.put(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));
requestStats.getResponseQueueSize().incrementAndGet();

if (!isActive.get()) {
Expand Down Expand Up @@ -294,9 +311,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
protected void writeAndFlushResponseToClient(Channel channel) {
// loop from first responseFuture.
while (isActive.get()) {
final ResponseAndRequest responseAndRequest = responseQueue.peek();
final ResponseAndRequest responseAndRequest = requestQueue.peek();
if (responseAndRequest == null) {
// responseQueue is empty
// requestQueue is empty
break;
}

Expand All @@ -313,7 +330,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
}
break;
} else {
if (responseQueue.remove(responseAndRequest)) {
if (requestQueue.remove(responseAndRequest)) {
responseAndRequest.updateStats(requestStats);
requestStats.getResponseQueueSize().decrementAndGet();
} else { // it has been removed by another thread, skip this element
Expand All @@ -331,7 +348,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
if (expired) {
log.error("[{}] request {} is not completed for {} ns (> {} ms)",
channel, request.getHeader(), nanoSecondsSinceCreated, kafkaConfig.getRequestTimeoutMs());
responseFuture.complete(null); // whether send timeout exception to client?
responseFuture.cancel(true);
requestStats.getRequestQueuedLatencyStats().registerFailedEvent(
MathUtils.elapsedNanos(responseAndRequest.getCreatedTimestamp()), TimeUnit.NANOSECONDS);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
protected void close() {
if (isActive.getAndSet(false)) {
log.info("close channel {}", ctx.channel());
groupCoordinator.getOffsetAcker().close(groupIds);
super.close();
groupCoordinator.getOffsetAcker().close(groupIds);
topicManager.close();
String clientHost = ctx.channel().remoteAddress().toString();
if (currentConnectedGroup.containsKey(clientHost)){
Expand Down Expand Up @@ -762,6 +761,10 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
int timeoutMs = produceRequest.timeout();
Runnable complete = () -> {
topicPartitionNum.set(0);
if (resultFuture.isDone()) {
// It may be triggered again in DelayedProduceAndFetch
return;
}
// add the topicPartition with timeout error if it's not existed in responseMap
produceRequest.partitionRecordsOrFail().keySet().forEach(topicPartition -> {
if (!responseMap.containsKey(topicPartition)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
int timeoutMs = fetchRequestRequest.maxWait();
Runnable complete = () -> {
topicPartitionNum.set(0);
if (resultFuture.isCancelled()) {
// The request was cancelled by KafkaCommandDecoder when channel is closed or this request is expired,
// so the Netty buffers should be released.
decodeResults.forEach(DecodeResult::release);
return;
}
if (resultFuture.isDone()) {
// It may be triggered again in DelayedProduceAndFetch
return;
}
// add the topicPartition with timeout error if it's not existed in responseData
fetchRequestRequest.fetchData().keySet().forEach(topicPartition -> {
if (!responseData.containsKey(topicPartition)) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
Expand Down Expand Up @@ -99,8 +98,7 @@ void testSimpleProduceAndConsume(String topic) {
log.info("Successfully send {} to {}-partition-{}",
key, recordMetadata.topic(), recordMetadata.partition());
} else {
log.error("Failed to send {}", key);
fail("Failed to send " + key);
log.error("Failed to send {}: {}", key, e.getMessage());
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down