Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][broker] Occur so many ERROR log in broker, which is confusing #23022

Closed
3 tasks done
TakaHiR07 opened this issue Jul 11, 2024 · 3 comments
Closed
3 tasks done

[Bug][broker] Occur so many ERROR log in broker, which is confusing #23022

TakaHiR07 opened this issue Jul 11, 2024 · 3 comments
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@TakaHiR07
Copy link
Contributor

TakaHiR07 commented Jul 11, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

master

Minimal reproduce step

can implement a simple unittest in ReaderTest.class to reproduce ERROR log

@Test
    private void testRead() throws Exception {
        cleanup();
        conf.setDispatcherReadFailureBackoffInitialTimeInMs(1000);
        setup();

        String topic = "persistent://my-property/my-ns/my-reader-topic";
        boolean enableBatch = false;

        int numKeys = 10;

        Set<String> keys = publishMessages(topic, numKeys, enableBatch);
        Reader<byte[]> reader = pulsarClient.newReader()
                .topic(topic)
                .startMessageId(MessageId.earliest)
                .readerName(subscription)
                .create();

        while (reader.hasMessageAvailable()) {
            Message<byte[]> message = reader.readNext();
            Assert.assertTrue(keys.remove(message.getKey()));
        }
        reader.close();

        Thread.sleep(2000);
    }

What did you expect to see?

should not log this confusing error, since everything is OK.

What did you see instead?

In order to fix NonDurableImpl OOM problem, this pr #22454, and several previous pr make some modification.

This is the following modification and the relevant code.

private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Object ctx) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
}
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}

public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, Position maxPosition,
Predicate<Position> skipCondition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}
int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);
if (hasMoreEntries()) {
// If we have available entries, we can read them immediately
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deferring retry of read at position {}", ledger.getName(), name, op.readPosition);
}
// Check again for new entries after the configured time, then if still no entries are available register
// to be notified
if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
// If there's no delay, check directly from the same thread
checkForNewEntries(op, callback, ctx);
}
}
}

The error log Root reason is:

If reader readNext and then close, if hasMoreEntries() is false, it would enter checkForNewEntries() with default 10ms delay. During this time, nonDurableCursor's state change to closed. So after 10ms delay, it would throw CursorAlreadyClosedException in checkForNewEntries().

The execute order is : asyncReadEntriesWithSkipOrWait -> cursor become closed -> after 10ms delay -> checkForNewEntries

And PersistentDispatcherSingleActiveConsumer would log Error reading entries at 5:1 : Cursor was already closed - Retrying to read in 1.0 seconds and then log Skipping read retry: Current Consumer null, havePendingRead false

It seems once nonDurableCursor is closed. There is no need to log the error, and no need to schedule readEntry again. Besides, we should also consider that if cursor is durable, should it schedule readEntry again?

Now the code in readEntry looks still have some unreasonable area.

This is the log when reproducing, including a confusing ERROR log.

2024-07-11T16:44:31,915+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:61363] Created subscription on topic persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d
2024-07-11T16:44:31,915+0800 [pulsar-client-io-102-4] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Subscribed to topic on localhost/127.0.0.1:61358 -- consumer: 0
2024-07-11T16:44:31,916+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Trigger new read after receiving flow control message
2024-07-11T16:44:31,916+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Schedule read of 100 messages
2024-07-11T16:44:31,916+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Read entries immediately
2024-07-11T16:44:31,919+0800 [PulsarTestContext-executor-OrderedExecutor-0-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Filtering entries [5:0..5:0] - alreadyDeleted: []
2024-07-11T16:44:31,920+0800 [PulsarTestContext-executor-OrderedExecutor-0-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] No filtering needed for entries [5:0..5:0]
2024-07-11T16:44:31,920+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Got messages: 1
2024-07-11T16:44:31,926+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Schedule read of 100 messages
2024-07-11T16:44:31,926+0800 [broker-topic-workers-OrderedExecutor-7-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Deferring retry of read at position 5:1
2024-07-11T16:44:31,929+0800 [pulsar-io-75-4] DEBUG org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Cumulative ack on 5:0
2024-07-11T16:44:31,929+0800 [pulsar-io-75-4] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] Mark delete cursor reader-2b31818c8d up to position: 5:0
2024-07-11T16:44:31,929+0800 [pulsar-io-75-4] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] Moved ack position from: 5:-1 to: 5:0 -- skipped: 1
2024-07-11T16:44:31,932+0800 [pulsar-io-75-4] DEBUG org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Mark deleted messages to position 5:0 from position 5:-1
2024-07-11T16:44:31,932+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:61363] Closing consumer: consumerId=0
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Cancel pending read request
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Successfully closed subscription [NonDurableCursorImpl{ledger=my-property/my-ns/persistent/my-reader-topic, cursor=reader-2b31818c8d, ackPos=5:0, readPos=5:1}]
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic][reader-2b31818c8d] Successfully closed dispatcher for reader
2024-07-11T16:44:31,933+0800 [pulsar-io-75-4] DEBUG org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://my-property/my-ns/my-reader-topic] [reader-2b31818c8d] [reader-sub] Removed consumer -- count: 0
2024-07-11T16:44:31,934+0800 [pulsar-io-75-4] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:61363] Closed consumer, consumerId=0
2024-07-11T16:44:31,934+0800 [pulsar-client-io-102-4] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://my-property/my-ns/my-reader-topic] [reader-2b31818c8d] Closed consumer
2024-07-11T16:44:31,935+0800 [pulsar-74-1] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] Consumer reader-2b31818c8d cursor ml-entries: 1 -- deleted-counter: 1 other counters: mdPos 5:0 rdPos 5:1
2024-07-11T16:44:31,938+0800 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] DEBUG org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-ns/persistent/my-reader-topic] [reader-2b31818c8d] Re-trying the read at position null
2024-07-11T16:44:31,938+0800 [broker-topic-workers-OrderedExecutor-7-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Error reading entries at 5:1 : Cursor was already closed - Retrying to read in 1.0 seconds
2024-07-11T16:44:32,945+0800 [broker-topic-workers-OrderedExecutor-7-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://my-property/my-ns/my-reader-topic / reader-2b31818c8d-Consumer{subscription=PersistentSubscription{topic=persistent://my-property/my-ns/my-reader-topic, name=reader-2b31818c8d}, consumerId=0, consumerName=reader-sub, address=[id: 0x80218327, L:/127.0.0.1:61358 - R:/127.0.0.1:61363] [SR:127.0.0.1, state:Connected]}] Skipping read retry: Current Consumer null, havePendingRead false

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@TakaHiR07 TakaHiR07 added the type/bug The PR fixed a bug or issue reported a bug label Jul 11, 2024
@TakaHiR07
Copy link
Contributor Author

@Technoboy- @codelipenghui @lhotari Can you take a look of this issue

@dao-jun
Copy link
Member

dao-jun commented Jul 15, 2024

#22751

@dao-jun
Copy link
Member

dao-jun commented Aug 17, 2024

close the issue because of #22751 merged

@dao-jun dao-jun closed this as completed Aug 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants