-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scheduler(2pc): agent for 2 phase scheduling #5593
scheduler(2pc): agent for 2 phase scheduling #5593
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
365c2bf
to
182f2a9
Compare
cdc/scheduler/internal/tp/agent.go
Outdated
// This panic will happen only if two messages have been received | ||
// with the same ownerRev but with different ownerIDs. | ||
// This should never happen unless the election via Etcd is buggy. | ||
log.Panic("owner IDs do not match", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if p2p batches messages of different owners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not panic, because message from different owner should have different revision.
This is for two owner with the same revision, but different owner ID
, which is unrecoveriable etcd error.
cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go
Outdated
Show resolved
Hide resolved
@@ -573,6 +576,7 @@ func (r *ReplicationSet) handleRemoveTable() ([]*schedulepb.Message, error) { | |||
zap.Any("replicationSet", r), zap.Int64("tableID", r.TableID)) | |||
return nil, nil | |||
} | |||
// todo: OldState must be `replicating` here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we deal with these todo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks OldState
is only used for logging, we can remove it from the log, since we know that it's must be in desired state.
cdc/scheduler/internal/tp/agent.go
Outdated
|
||
// pendingTasks is a queue of dispatch table task yet to be processed. | ||
// the Deque stores *dispatchTableTask. | ||
pendingTasks deque.Deque |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a deque? can we handle tasks immediately? Owner control the total number of task that are running concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider that dispatchTableRequest
A and B received, sent by owner 1, and followed by heartbeat
or dispatchTableRequest
sent by owner 2. A and B should not be processed.
By putting tasks in pending states, make sure this tick does not do unnecessary task handling.
But I consider that, no matter owner switch or not, all tasks should be handled, this can reduce unnecessary table rescheduling, and ongoing old task does not affect global table distribution and scheduling correctly, since the heartbeat can help the new owner detect all tables' states.
So, we can remove pendingTasks
, just handling all dispatchTableTasks
directly.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
* fix some typo. * update table scheduler proto * add some new to agent. * track owner info. * try to handle dispatch table request. * add more and more to agent implementation. * fix update owner info. * finish handle dispatch table. * tackle epoch * remove checkpoint from proto * handle heartbeat with stopping. * add benchmark for heartbeat response. * fix agent. * fix agent code layout. * refine benchmark test. * refine coordinator / capture_manager / relication_manager. * fix agent. * add a lot of test. * revise the code. * fix by suggestion. * fix by suggestion. * remoe pendingTask. * fix unit test.
What problem does this PR solve?
Issue Number: ref #4757
What is changed and how it works?
2 phase scheduling agent implementation.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note