-
Notifications
You must be signed in to change notification settings - Fork 138
Use LinkedBlockingQueue as the container of ResponseAndRequest #500
Use LinkedBlockingQueue as the container of ResponseAndRequest #500
Conversation
|
d19fd87
to
e293791
Compare
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
Outdated
Show resolved
Hide resolved
@hangc0276 From your comment, I just thought of that this problem also existed in current code because the I think we need another way to handle this case. |
e293791
to
acbef17
Compare
@hangc0276 I've pushed the fix, PTAL again. |
It looks like after rebasing to master, some tests failed. I'll take a look. |
tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTestBase.java
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
Show resolved
Hide resolved
The |
11c2935
to
74a9590
Compare
@hangc0276 PTAL again |
tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTestBase.java
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java
Outdated
Show resolved
Hide resolved
@hangc0276 I adjusted the logic of response future's callback. The key point is that when the future is cancelled, skip There're two main reasons:
If the response future is completed exceptionally or completed with null value, it will be handled in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Motivation
There're
requestQueue
andresponseQueue
in each channel.responseQueue
stores the pendingResponseAndRequest
to make responses be sent in order. However,requestQueue
is just to limit the max number of requests and it's a fake queue because it only maintains a request count.Sometimes an exception may be thrown like
It's because when an empty
requestQueue
calledpoll()
method. It may be caused by theclose()
method, which clears therequestQueue
. However, the pending response future may complete and triggerwriteAndFlushResponseToClient
, which triggers anotherpoll()
call. In addition,writeAndFlushResponseToClient
may not remove any responses fromresponseQueue
but each time it's called therequestQueue.poll()
will be called.Modifications
LinkedBlockingQueue
to storeResponseAndRequest
and limit the max number of requests, use therequestQueue
as its name to avoid misunderstanding.requestQueue
, cancel all response futures and release the response buffer when it's a FETCH response. If the response future is cancelled, don't callwriteAndFlushResponseToClient
in callback.DelayedOperation
's callback because the callback may be triggered twice.