Skip to content

Commit

Permalink
[fix][broker] Fix wrong double-checked locking for readOnActiveConsum…
Browse files Browse the repository at this point in the history
…erTask in dispatcher (apache#22279)
  • Loading branch information
BewareMyPower authored Mar 16, 2024
1 parent 442595e commit 4e0c145
Showing 1 changed file with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
protected volatile int readBatchSize;
protected final Backoff readFailureBackoff;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
private final Object lockForReadOnActiveConsumerTask = new Object();

private final RedeliveryTracker redeliveryTracker;

Expand Down Expand Up @@ -123,18 +124,23 @@ protected void scheduleReadOnActiveConsumer() {
return;
}

readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
synchronized (lockForReadOnActiveConsumerTask) {
if (readOnActiveConsumerTask != null) {
return;
}
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());

notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
}
}

@Override
Expand Down

0 comments on commit 4e0c145

Please sign in to comment.