-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Closed topics won't be removed from the cache #23884
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23884 +/- ##
============================================
+ Coverage 73.57% 74.15% +0.58%
+ Complexity 32624 32203 -421
============================================
Files 1877 1853 -24
Lines 139502 143659 +4157
Branches 15299 16315 +1016
============================================
+ Hits 102638 106534 +3896
+ Misses 28908 28722 -186
- Partials 7956 8403 +447
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reduce code duplication
// Don't directly use the topic object to close, because the topicFuture might not | ||
// be completed at that time, which could leave closed topics in the cache(at BrokerService). | ||
CompletableFuture<Optional<Topic>> topicFuture = topic.getBrokerService().getTopics().get(topic.getName()); | ||
if (topicFuture != null) { | ||
topicFuture.thenAccept(t -> t.ifPresent(v -> { | ||
v.close(true).exceptionally(ec -> { | ||
log.error("Close topic {} exception", v.getName(), ec); | ||
return null; | ||
}); | ||
})); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is now duplicated in multiple locations. Instead of adding code duplication,
I'd suggest to create a new public method directly in BrokerService which called closeTopicForcefullyIfExists
. The javadoc should describe the purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
BrokerService brokerService = pulsar.getBrokerService(); | ||
Field pulsarStatsField = BrokerService.class.getDeclaredField("pulsarStats"); | ||
pulsarStatsField.setAccessible(true); | ||
PulsarStats pulsarStats = brokerService.getPulsarStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid using reflection as much as possible in tests. It makes refactoring very hard. My experience refactoring some code of Pulsar is really terrible due to the reflection everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you want to delay the openLedgerComplete
method by sleeping 1 second in recordTopicLoadTimeValue
so that topicFuture.complete
will happen after the transaction failure.
But I think you can customize the TopicFactory
to achieve the same goal. You can override the create
method to create a PersistentTopic
's subclass whose initialize()
method will be delayed for some time
topic.close(); | ||
return null; | ||
}); | ||
log.error("{} Failed to create snapshot writer", topic.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to add ex
to the log message here?
// be completed at that time, which could leave closed topics in the cache(at BrokerService). | ||
CompletableFuture<Optional<Topic>> topicFuture = topic.getBrokerService().getTopics().get(topic.getName()); | ||
if (topicFuture != null) { | ||
topicFuture.thenAccept(t -> t.ifPresent(v -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why the value of topics
is an Optional
. I just have a question that when could the value be Optional.empty()
? If so, an empty Optional
will be cached and never removed.
if (topicFuture != null) { | ||
topicFuture.thenAccept(t -> t.ifPresent(v -> { | ||
v.close(true).exceptionally(ec -> { | ||
log.error("Close topic {} exception", v.getName(), ec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If close
failed, the topic will still be cached. Should you call BrokerService#removeTopicFromCache
in this case?
TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor snapshotAbortedTxnProcessor, | ||
AbortedTxnProcessor.SnapshotType snapshotType) { | ||
super(State.None); | ||
this.topic = topic; | ||
this.timer = topic.getBrokerService().getPulsar().getTransactionTimer(); | ||
this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar() | ||
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount(); | ||
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar() | ||
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis(); | ||
this.maxReadPosition = topic.getManagedLedger().getLastConfirmedEntry(); | ||
this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor; | ||
this.snapshotType = snapshotType; | ||
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack(); | ||
this.recover(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new constructor overload has much duplicated code with the original constructor. Please reuse the code. It only just reduces duplicated code, but also simulate the real case more likely.
Here is a bad example. Assuming there is a Demo
class whose constructor is:
public Demo(Argument arg) {
this.field1 = arg.f();
this.field2 = arg.g();
}
If you wanted to pass a mocked field2
and added a new constructor;
Demo(Argument arg, Field2 field2) {
this.field1 = arg.h();
this.field2 = field2;
}
Then the new constructor will be meaningless. Because now field
is not arg.f()
anymore.
hi, @lhotari @BewareMyPower @dao-jun Thanks for details review, After discussing with @poorbarcode , we've decided to pass |
Motivation
When transaction component init fails, it will try to close the topic, but maybe not remove topic from cache.
createTransactionBufferSystemTopicClient
create topic
Failed to create snapshot writer
and close topicModifications
topic.close
run after topicFuture completeVerifying this change
testNoOrphanClosedTopicIfTxnInternalFailed
to reproduce this.Documentation
doc
doc-required
doc-not-needed
doc-complete