Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

fix issue #252 bundle unload bug #404

Merged

Conversation

hangc0276
Copy link
Collaborator

Fix #252

Motivation

When bundle unload triggered, the consumerTopicManagers cache won't be evicted and it will return the old KafkaTopicConsumerManager instance in the next fetch request handle. However, after bundle unload, the producer/consumer/managedLedger of topics in related bundle will be closed. If we use old KafkaTopicConsumerManager instance to read messages, it will return managedLedger has been closed exception.

Changes

  1. Change consumerTopicManagers, topics, references map to static attribute ConcurrentHashMap.
  2. Evict related cache information for topics whose bundle trigged unload.
  3. Turn on DistributedClusterTest .

@BewareMyPower
Copy link
Collaborator

@hangc0276 Could you take a look at the test failure? It looks like the test is still unstable after the change. I think it may not be caused by the uncleaned static cache.

@hangc0276
Copy link
Collaborator Author

@hangc0276 Could you take a look at the test failure? It looks like the test is still unstable after the change. I think it may not be caused by the uncleaned static cache.

@BewareMyPower OK, maybe there are other bugs to cause this problem, it looks like caused by loadbalance, i will address it these days.

@hangc0276 hangc0276 force-pushed the chenhang/fix_bundle_unload_bug branch from de67ff4 to d5051c0 Compare April 2, 2021 06:39
@hangc0276
Copy link
Collaborator Author

hangc0276 commented Apr 2, 2021

When run DistributedClusterTest#testMutiBrokerAndCoordinator, it will block in publishMessages, and in the retry round, also blocked.
image

The code of ManagedLedgerImpl.java as follow:

public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) {
        log.info("[hangc] start asyncAddEntry v1...");
        if (log.isDebugEnabled()) {
            log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
        }

        OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);

        log.info("[hangc] start execute name: {}", name);
        // Jump to specific thread to avoid contention from writers writing from different threads
        executor.executeOrdered(name, safeRun(() -> log.info("pre exec...")));
        executor.executeOrdered(name, safeRun(() -> {
            log.info("[hangc] enter thread exec internalAsyncAddEntry...");
            internalAsyncAddEntry(addOperation);
        }));

@hangc0276 hangc0276 force-pushed the chenhang/fix_bundle_unload_bug branch 4 times, most recently from 38e6731 to 788d5db Compare April 13, 2021 03:08
@BewareMyPower
Copy link
Collaborator

BewareMyPower commented Apr 19, 2021

@hangc0276 Could you take a look at the failed testOneBrokerShutdown?

Also you can rebase to latest master because we ignored the unstable tests, see #444

@hangc0276 hangc0276 force-pushed the chenhang/fix_bundle_unload_bug branch from 5238a0b to ef4f94c Compare April 22, 2021 05:00
@hangc0276 hangc0276 requested a review from BewareMyPower April 22, 2021 07:02
@BewareMyPower BewareMyPower merged commit 31e9295 into streamnative:master Apr 22, 2021
BewareMyPower added a commit to BewareMyPower/openmessaging-benchmark that referenced this pull request Apr 27, 2021
BewareMyPower added a commit that referenced this pull request Jul 6, 2021
This PR migrates #404 #570 and because it's too hard to cherry-pick them.

In addition, since Pulsar 2.7.2.6 introduced API changes but it only affected some unused classes that were removed from master branch in #387, this PR removed them so that KoP 2.7.2.6 can keep compatible with Pulsar 2.7.0.
BewareMyPower added a commit that referenced this pull request Jul 23, 2021
This PR migrates #404 because it's too hard to cherry-pick them.
BewareMyPower added a commit that referenced this pull request Jul 23, 2021
Fixes #618 

### Motivation

See #618 (comment) for the deadlock analysis.

### Modifications
- Use `ConcurrentHashMap` instead of `ConcurrentLongHashMap`. Though this bug may already be fixed in apache/pulsar#9787, the `ConcurrentHashMap` from Java standard library is more reliable. The possible performance enhancement brought by `ConcurrentLongHashMap` still needs to be proved.
- Use `AtomicBoolean` as `KafkaTopicConsumerManager`'s state instead of read-write lock to avoid `close()` method that tries to acquire write lock blocking.
- Run a single cursor expire task instead one task per channel, since #404 changed `consumerTopicManagers` to a static field, there's no reason to run a task for each connection.
BewareMyPower added a commit to BewareMyPower/kop that referenced this pull request Jul 25, 2021
Fixes streamnative#618 

### Motivation

See streamnative#618 (comment) for the deadlock analysis.

### Modifications
- Use `ConcurrentHashMap` instead of `ConcurrentLongHashMap`. Though this bug may already be fixed in apache/pulsar#9787, the `ConcurrentHashMap` from Java standard library is more reliable. The possible performance enhancement brought by `ConcurrentLongHashMap` still needs to be proved.
- Use `AtomicBoolean` as `KafkaTopicConsumerManager`'s state instead of read-write lock to avoid `close()` method that tries to acquire write lock blocking.
- Run a single cursor expire task instead one task per channel, since streamnative#404 changed `consumerTopicManagers` to a static field, there's no reason to run a task for each connection.
BewareMyPower added a commit that referenced this pull request Jul 25, 2021
Fixes #618 

### Motivation

See #618 (comment) for the deadlock analysis.

### Modifications
- Use `ConcurrentHashMap` instead of `ConcurrentLongHashMap`. Though this bug may already be fixed in apache/pulsar#9787, the `ConcurrentHashMap` from Java standard library is more reliable. The possible performance enhancement brought by `ConcurrentLongHashMap` still needs to be proved.
- Use `AtomicBoolean` as `KafkaTopicConsumerManager`'s state instead of read-write lock to avoid `close()` method that tries to acquire write lock blocking.
- Run a single cursor expire task instead one task per channel, since #404 changed `consumerTopicManagers` to a static field, there's no reason to run a task for each connection.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] testMutiBrokerUnloadReload cannot pass
3 participants