Skip to content

Commit

Permalink
Revert havePendingReplayRead related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Jan 23, 2025
1 parent 450069a commit 06827df
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,6 @@ public synchronized void readMoreEntries() {
}
return;
}
if (havePendingReplayRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to replay in-progress.", topic.getName(),
getSubscriptionName());
}
return;
}
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
Expand Down Expand Up @@ -386,23 +379,13 @@ public synchronized void readMoreEntries() {
long bytesToRead = calculateResult.getRight();

if (messagesToRead == -1 || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate
// Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
return;
}

Set<Position> messagesToReplayNow =
canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet();
if (!messagesToReplayNow.isEmpty()) {
// before replaying, cancel possible pending read that is waiting for more entries
cancelPendingRead();
if (havePendingRead) {
// skip read since a pending read is already in progress which cannot be cancelled
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping replay read for the topic, Due to pending read in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayNow.size(), consumerList.size());
Expand Down Expand Up @@ -632,6 +615,13 @@ protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits)
}
}

if (havePendingReplayRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
}
return Pair.of(-1, -1L);
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);
bytesToRead = Math.max(bytesToRead, 1);
Expand Down Expand Up @@ -727,12 +717,6 @@ public SubType getType() {
public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
ReadType readType = (ReadType) ctx;
if (readType == ReadType.Normal) {
if (!havePendingRead) {
log.debug("Discarding read entries as there is no pending read");
entries.forEach(Entry::release);
readMoreEntriesAsync();
return;
}
havePendingRead = false;
} else {
havePendingReplayRead = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,23 +560,13 @@ public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInS
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
public synchronized void readMoreEntries() {
havePendingRead = true;
}

@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
public synchronized void readMoreEntries() {
havePendingRead = true;
}

@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
Expand All @@ -588,11 +578,8 @@ protected void reScheduleReadInMs(long readAfterMs) {
consumerMockAvailablePermits.set(0);
dispatcher.addConsumer(consumerMock);


// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
// call the overridden "readMoreEntries" method that sets the "havePendingRead" flag
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
Expand All @@ -601,7 +588,6 @@ protected void reScheduleReadInMs(long readAfterMs) {
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
Expand All @@ -612,7 +598,6 @@ protected void reScheduleReadInMs(long readAfterMs) {
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
Expand All @@ -624,15 +609,13 @@ protected void reScheduleReadInMs(long readAfterMs) {
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
Expand Down Expand Up @@ -660,23 +643,13 @@ public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSub
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
public synchronized void readMoreEntries() {
havePendingRead = true;
}

@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
public synchronized void readMoreEntries() {
havePendingRead = true;
}

@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
Expand All @@ -690,8 +663,6 @@ protected void reScheduleReadInMs(long readAfterMs) {

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
// call the overridden "readMoreEntries" method that sets the "havePendingRead" flag
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
Expand All @@ -700,7 +671,6 @@ protected void reScheduleReadInMs(long readAfterMs) {
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
Expand All @@ -711,7 +681,6 @@ protected void reScheduleReadInMs(long readAfterMs) {
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
Expand All @@ -723,15 +692,13 @@ protected void reScheduleReadInMs(long readAfterMs) {
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
dispatcher.readMoreEntries();
dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
Expand All @@ -753,7 +720,7 @@ public void testNoBackoffDelayWhenDelayedMessages(boolean dispatchMessagesInSubs
AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0);
AtomicBoolean delayAllMessages = new AtomicBoolean(true);

PersistentDispatcherMultipleConsumers dispatcher;
AbstractPersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
Expand All @@ -765,7 +732,6 @@ protected void reScheduleReadInMs(long readAfterMs) {

@Override
public synchronized void readMoreEntries() {
havePendingRead = true;
readMoreEntriesCalled.incrementAndGet();
}

Expand All @@ -787,7 +753,6 @@ protected void reScheduleReadInMs(long readAfterMs) {

@Override
public synchronized void readMoreEntries() {
havePendingRead = true;
readMoreEntriesCalled.incrementAndGet();
}

Expand Down Expand Up @@ -815,8 +780,6 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
dispatcher.addConsumer(consumerMock);

List<Entry> entries = new ArrayList<>(List.of(EntryImpl.create(1, 1, createMessage("message1", 1))));
dispatcher.readMoreEntries();
readMoreEntriesCalled.set(0);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(reScheduleReadInMsCalled.get(), 0, "reScheduleReadInMs should not be called");
Expand Down

0 comments on commit 06827df

Please sign in to comment.