Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Use LinkedBlockingQueue as the container of ResponseAndRequest #500

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;

import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.naming.AuthenticationException;
Expand All @@ -34,6 +34,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
Expand All @@ -55,11 +56,8 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
protected SocketAddress remoteAddress;
@Getter
protected AtomicBoolean isActive = new AtomicBoolean(false);
// Queue to make response get responseFuture in order.
private final Queue<ResponseAndRequest> responseQueue = Queues.newConcurrentLinkedQueue();

// Queue to make request at the given capacity.
private final FakeArrayBlockingQueue requestQueue;
// Queue to make response get responseFuture in order and limit the max request size
private final LinkedBlockingQueue<ResponseAndRequest> requestQueue;

protected final RequestStats requestStats;
@Getter
Expand All @@ -68,7 +66,7 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
public KafkaCommandDecoder(StatsLogger statsLogger, KafkaServiceConfiguration kafkaConfig) {
this.requestStats = new RequestStats(statsLogger);
this.kafkaConfig = kafkaConfig;
this.requestQueue = new FakeArrayBlockingQueue(kafkaConfig.getMaxQueuedRequests());
this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests());
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -86,9 +84,19 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}

protected void close() {
// Clear the request queue
log.info("close channel {} with {} pending responses", ctx.channel(), requestQueue.size());
while (true) {
final ResponseAndRequest responseAndRequest = requestQueue.poll();
if (responseAndRequest != null) {
// Trigger writeAndFlushResponseToClient immediately, but it will do nothing because isActive is false
responseAndRequest.getResponseFuture().cancel(true);
} else {
// queue is empty
break;
}
}
ctx.close();
requestQueue.clear();
responseQueue.clear();
}

@Override
Expand Down Expand Up @@ -120,7 +128,7 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg,
}
}

protected ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
protected static ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
try (KafkaHeaderAndResponse kafkaHeaderAndResponse =
KafkaHeaderAndResponse.responseForRequest(request, response)) {
// Lowering Client API_VERSION request to the oldest API_VERSION KoP supports, this is to make \
Expand Down Expand Up @@ -154,7 +162,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
remoteAddress = channel.remoteAddress();
}

requestQueue.put();
requestStats.getRequestQueueSize().incrementAndGet();

final long timeBeforeParse = MathUtils.nowInNano();
Expand All @@ -171,18 +178,21 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<>();
responseFuture.whenComplete((response, e) -> {
// Do nothing if it's triggered when responseFuture is expired
if (response == null) {
if (e instanceof CancellationException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Request {} is cancelled",
ctx.channel(), kafkaHeaderAndRequest.getHeader());
}
// The response future is cancelled by `close` or `writeAndFlushResponseToClient` method, there's
// no need to call `writeAndFlushResponseToClient` again.
return;
}
ctx.channel().eventLoop().execute(() -> {
writeAndFlushResponseToClient(channel);
// release requestQueue room
requestQueue.poll(); // throw exception wheather cause the eventLoop thread dead
});
});

responseQueue.add(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));
// potentially blocking until there is room in the queue for the request.
requestQueue.put(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));
requestStats.getResponseQueueSize().incrementAndGet();

if (!isActive.get()) {
Expand Down Expand Up @@ -294,9 +304,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
protected void writeAndFlushResponseToClient(Channel channel) {
// loop from first responseFuture.
while (isActive.get()) {
final ResponseAndRequest responseAndRequest = responseQueue.peek();
final ResponseAndRequest responseAndRequest = requestQueue.peek();
if (responseAndRequest == null) {
// responseQueue is empty
// requestQueue is empty
break;
}

Expand All @@ -313,7 +323,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
}
break;
} else {
if (responseQueue.remove(responseAndRequest)) {
if (requestQueue.remove(responseAndRequest)) {
responseAndRequest.updateStats(requestStats);
requestStats.getResponseQueueSize().decrementAndGet();
} else { // it has been removed by another thread, skip this element
Expand All @@ -327,48 +337,58 @@ protected void writeAndFlushResponseToClient(Channel channel) {
}

final KafkaHeaderAndRequest request = responseAndRequest.getRequest();
// case 2: responseFuture is expired
if (expired) {
log.error("[{}] request {} is not completed for {} ns (> {} ms)",
channel, request.getHeader(), nanoSecondsSinceCreated, kafkaConfig.getRequestTimeoutMs());
responseFuture.complete(null); // whether send timeout exception to client?
requestStats.getRequestQueuedLatencyStats().registerFailedEvent(
MathUtils.elapsedNanos(responseAndRequest.getCreatedTimestamp()), TimeUnit.NANOSECONDS);
continue;
}

// case 3: responseFuture is completed exceptionally
// case 2: responseFuture is completed exceptionally
if (responseFuture.isCompletedExceptionally()) {
responseFuture.exceptionally(e -> {
log.error("[{}] request {} completed exceptionally", channel, request.getHeader(), e);
channel.writeAndFlush(request.createErrorResponse(e));
requestStats.getRequestQueuedLatencyStats().registerFailedEvent(
MathUtils.elapsedNanos(responseAndRequest.getCreatedTimestamp()), TimeUnit.NANOSECONDS);
return null;
}); // send exception to client?
continue;
}

// case 4: responseFuture is completed normally
responseFuture.thenApply(response -> {
if (log.isDebugEnabled()) {
log.debug("Write kafka cmd to client."
+ " request content: {}"
+ " responseAndRequest content: {}",
request, response.toString(request.getRequest().version()));
}

final ByteBuf result = responseToByteBuf(response, request);
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
((ResponseCallbackWrapper) response).responseComplete();
// case 3: responseFuture is completed normally
if (responseFuture.isDone()) {
responseFuture.thenAccept(response -> {
if (response == null) {
// It should not be null, just check it for safety
log.error("[{}] Unexpected null completed future for request {}",
ctx.channel(), request.getHeader());
channel.writeAndFlush(request.createErrorResponse(new ApiException("response is null")));
return;
}
if (!future.isSuccess()) {
log.error("[{}] Failed to write {}", channel, request.getHeader(), future.cause());
if (log.isDebugEnabled()) {
log.debug("Write kafka cmd to client."
+ " request content: {}"
+ " responseAndRequest content: {}",
request, response.toString(request.getRequest().version()));
}

final ByteBuf result = responseToByteBuf(response, request);
channel.writeAndFlush(result).addListener(future -> {
if (response instanceof ResponseCallbackWrapper) {
((ResponseCallbackWrapper) response).responseComplete();
}
if (!future.isSuccess()) {
log.error("[{}] Failed to write {}", channel, request.getHeader(), future.cause());
}
});
});
return null;
});
}

// case 4: responseFuture is expired
if (expired) {
log.error("[{}] request {} is not completed for {} ns (> {} ms)",
channel, request.getHeader(), nanoSecondsSinceCreated, kafkaConfig.getRequestTimeoutMs());
responseFuture.cancel(true);
channel.writeAndFlush(
request.createErrorResponse(new ApiException("request is expired from server side")));
requestStats.getRequestQueuedLatencyStats().registerFailedEvent(
MathUtils.elapsedNanos(responseAndRequest.getCreatedTimestamp()), TimeUnit.NANOSECONDS);
}
}
}

Expand Down Expand Up @@ -501,8 +521,8 @@ public String getClientHost() {
}
}

ByteBuf getBuffer() {
return this.buffer;
public ByteBuf createErrorResponse(Throwable e) {
return responseToByteBuf(request.getErrorResponse(e), this);
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
protected void close() {
if (isActive.getAndSet(false)) {
log.info("close channel {}", ctx.channel());
groupCoordinator.getOffsetAcker().close(groupIds);
super.close();
groupCoordinator.getOffsetAcker().close(groupIds);
topicManager.close();
String clientHost = ctx.channel().remoteAddress().toString();
if (currentConnectedGroup.containsKey(clientHost)){
Expand Down Expand Up @@ -762,6 +761,10 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
int timeoutMs = produceRequest.timeout();
Runnable complete = () -> {
topicPartitionNum.set(0);
if (resultFuture.isDone()) {
// It may be triggered again in DelayedProduceAndFetch
return;
}
// add the topicPartition with timeout error if it's not existed in responseMap
produceRequest.partitionRecordsOrFail().keySet().forEach(topicPartition -> {
if (!responseMap.containsKey(topicPartition)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch,
int timeoutMs = fetchRequestRequest.maxWait();
Runnable complete = () -> {
topicPartitionNum.set(0);
if (resultFuture.isCancelled()) {
// The request was cancelled by KafkaCommandDecoder when channel is closed or this request is expired,
// so the Netty buffers should be released.
decodeResults.forEach(DecodeResult::release);
return;
}
if (resultFuture.isDone()) {
// It may be triggered again in DelayedProduceAndFetch
return;
}
// add the topicPartition with timeout error if it's not existed in responseData
fetchRequestRequest.fetchData().keySet().forEach(topicPartition -> {
if (!responseData.containsKey(topicPartition)) {
Expand Down
Loading