-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8 #19138
[feat][broker][PIP-195] Implement delayed message index bucket snapshot(merge/delete) - part8 #19138
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After BucketDelayedDeliveryTracker.recover
, part of the delayed messages will be put into the delayed message queue, but these messages may be reread by the managed cursor. Will this cause repeated delivery?
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; | ||
|
||
@NotThreadSafe | ||
public class TripleLongPriorityDelayedIndexQueue implements DelayedIndexQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that this class is used to make peek
and pop
easier to use, and it is only used by MutableBucket
, should it not exist as a separate public class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove public
modifier.
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; | ||
|
||
@NotThreadSafe | ||
public class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that this class is used to make peek
and pop
easier to use, and it is only used by merging two queues in BucketDelayedDeliveryTracker
, should it not exist as a separate public class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @coderzc
And this comment (^_^)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed this
import java.util.Objects; | ||
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; | ||
|
||
public interface DelayedIndexQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface is only used by MutableBucket
and is not universal. Is this interface not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove public modifier.
CompletableFuture<Void> removeAFuture = bucketA.asyncDeleteBucketSnapshot(); | ||
CompletableFuture<Void> removeBFuture = bucketB.asyncDeleteBucketSnapshot(); | ||
|
||
return CompletableFuture.allOf(removeAFuture, removeBFuture).thenRun(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do save the newly merged bucket after finished delete the original buckets, it may be caused bucket loss. I think losing buckets is tolerable, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can persist the newly merged bucket, then delete the original buckets. If failed to save the newly merged bucket, the tracker needs to rebuild the index, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the bucket index is lost, the tracker can rebuild the index, but if there are two overlapped buckets, it will cause data confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Delete the original bucket index first, then save the newly merged index, this may lose the bucket index, but the tracker will rebuild the index.
- Save the newly merged index first, then delete the original bucket index, this may cause intersecting buckets, and we need to find a way to clean up the original buckets.
OK, I think there are two ways, the first way is simple and clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Questions:
- When is the index rebuilt?
- Should we unload the topic to rebuild the index if this operation fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is the index rebuilt?
If index is lost, the tracker will rebuild index when reread by the managed cursor.
Should we unload the topic to rebuild the index if this operation fails?
Yes, we can unload the topic to rebuild the index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we mark the old segment as deleting or merging? So that we can continue the merge operation after the broker crashes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we mark the old segment as deleting or merging? So that we can continue the merge operation after the broker crashes.
This will introduce more state in metadata, we can improve it when we really need it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find a way to clean up the old overlap buckets when recovering buckets, so that we can persist the newly merged bucket first, then delete the original bucket.
The approach is as follows:
If there is range [1...10]
、[30...40]
、[1...40]
, then it will only remain [1...40]
.../src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
Outdated
Show resolved
Hide resolved
These messages that have already been added tracker will be skipped when reread by the managed cursor, More see: #19035 |
aa805a1
to
621a8e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! LGTM
// TODO merge bucket snapshot (synchronize operate) | ||
try { | ||
asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); | ||
} catch (InterruptedException | ExecutionException | TimeoutException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please call Thread.currentThread().interrupt();
if we get InterruptedException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mattisonchao
I wonder why we should keep the interrupted
state. Is this state to be handed over to netty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not. But I think we should persist in this state if we don't have any special purpose.
try { | ||
asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); | ||
} catch (InterruptedException | ExecutionException | TimeoutException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should consider the specific exception?
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
Outdated
Show resolved
Hide resolved
synchronized (immutableBuckets) { | ||
immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); | ||
} | ||
bucket.asyncDeleteBucketSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a log to show the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add exception log in asyncDeleteBucketSnapshot
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, I need to review the whole logic after this PR to ensure I got all the context.
76a8619
to
525e30f
Compare
525e30f
to
14b5ed2
Compare
Codecov Report
@@ Coverage Diff @@
## master #19138 +/- ##
============================================
- Coverage 47.00% 46.30% -0.70%
- Complexity 10639 18434 +7795
============================================
Files 713 1625 +912
Lines 69672 132248 +62576
Branches 7482 14560 +7078
============================================
+ Hits 32746 61234 +28488
- Misses 33212 64735 +31523
- Partials 3714 6279 +2565
Flags with carried forward coverage won't be shown. Click here to find out more.
|
PIP: #16763
Motivation
PIP: #16763
Modifications
Add
DelayedIndexQueue
interface to abstractCombinedSegmentDelayedIndexQueue
andTripleLongPriorityDelayedIndexQueue
.Implement delayed message index bucket snapshot(merge/delete).
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
testMergeSnapshot / DelayedIndexQueueTest
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: coderzc#35