Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix consumer stops receiving messages when with large backlogs processing #22454

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,11 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob
log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
}

if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}

if (!hasMoreEntries()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Still no entries available. Register for notification", ledger.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,7 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
+ consumerName), ctx);
return;
} else if (!cursor.isDurable()) {
cursor.setState(ManagedCursorImpl.State.Closed);
cursors.removeCursor(consumerName);
deactivateCursorByName(consumerName);
callback.deleteCursorComplete(ctx);
Expand Down Expand Up @@ -3814,13 +3815,7 @@ public void removeWaitingCursor(ManagedCursor cursor) {
}

public void addWaitingCursor(ManagedCursorImpl cursor) {
if (cursor instanceof NonDurableCursorImpl) {
if (cursor.isActive()) {
this.waitingCursors.add(cursor);
}
} else {
this.waitingCursors.add(cursor);
}
this.waitingCursors.add(cursor);
}

public boolean isCursorActive(ManagedCursor cursor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering about this removal now after the PR has been merged. It seems that whatever isResetCursor means that it would be skipped in that case. @Technoboy- is that a problem?


if (!cursor.isDurable()) {
// If cursor is not durable, we need to clean up the subscription as well. No need to check for active
Expand Down Expand Up @@ -338,11 +337,14 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
if (!isResetCursor) {
try {
topic.getManagedLedger().deleteCursor(cursor.getName());
topic.getManagedLedger().removeWaitingCursor(cursor);
} catch (InterruptedException | ManagedLedgerException e) {
log.warn("[{}] [{}] Failed to remove non durable cursor", topic.getName(), subName, e);
}
}
});
} else {
topic.getManagedLedger().removeWaitingCursor(cursor);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -113,6 +114,11 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setManagedLedgerCursorBackloggedThreshold(10);
}

/**
* Test validates that broker cleans up topic which failed to unload while bundle unloading.
*
Expand Down Expand Up @@ -681,7 +687,7 @@ public void testAddWaitingCursorsForNonDurable() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
final ManagedCursor spyCursor= spy(ledger.newNonDurableCursor(PositionImpl.LATEST, "sub-2"));
doAnswer((invocation) -> {
Thread.sleep(10_000);
Thread.sleep(5_000);
invocation.callRealMethod();
return null;
}).when(spyCursor).asyncReadEntriesOrWait(any(int.class), any(long.class),
Expand All @@ -704,8 +710,57 @@ public void testAddWaitingCursorsForNonDurable() throws Exception {
Awaitility.await()
.pollDelay(5, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.atMost(30, TimeUnit.SECONDS)
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
.untilAsserted(() -> {
assertEquals(ledger.getWaitingCursorsCount(), 0);
});
}

@Test
public void testAddWaitingCursorsForNonDurable2() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/testAddWaitingCursors2";
admin.topics().createNonPartitionedTopic(topicName);
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub-1").subscribe().close();
@Cleanup
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topicName).create();
for (int i = 0; i < 100; i ++) {
producer.sendAsync("test-" + i);
}
@Cleanup
final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("sub-2").subscribe();
int count = 0;
while(true) {
final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
if (msg != null) {
consumer.acknowledge(msg);
count++;
} else {
break;
}
}
Assert.assertEquals(count, 100);
Thread.sleep(3_000);
for (int i = 0; i < 100; i ++) {
producer.sendAsync("test-" + i);
}
while(true) {
final Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg != null) {
consumer.acknowledge(msg);
count++;
} else {
break;
}
}
Assert.assertEquals(count, 200);
}
}
Loading