-
Notifications
You must be signed in to change notification settings - Fork 910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Priority-enabled TaskMatcher #7196
Conversation
cdebec0
to
8236e0f
Compare
## What changed? <!-- Describe what has changed in this PR --> See title. ## Why? <!-- Tell your future self why have you made these changes --> Flaky tests no good! ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
## What changed? Wait for clusters to be synced instead of using Sleep. Make sure second run is started before failover. ## Why? Current implementation depends on the timing which is not reliable. ## How did you test it? Repeatedly run the test locally and no failure found. ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
|
||
createTime := ts.AsTime().UnixNano() | ||
count := delta | ||
if prev, ok := b.tree.Get(createTime); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is treemap thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, backlogAgeTracker is not thread safe (commented on line 35), the owner has to synchronize access. The philosophy of the new matcher is to put everything under one lock per matcher (1:1 with partition), to simplify code and reduce overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this method needs to be called under a lock? I think we need to use convention like methodLocked() to indicate a method needs to be called with lock obtained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For structs that have both locked and unlocked methods, maybe. This is just a simple data structure. The whole thing is either thread-safe or not. For comparison, maps and slices are not thread-safe and the functions that manipulate them in the stdlib maps
and slices
packages don't have any special annotations, it's understood that synchronization is the responsibility of the caller.
service/matching/matcher.go
Outdated
startTime time.Time | ||
forwardCtx context.Context // non-nil iff poll can be forwarded | ||
pollMetadata *pollMetadata // non-nil iff poll can be forwarded | ||
queryOnly bool // if true, poller can be given only query task, otherwise any task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have poller that is query only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, all pollers on a passive namespace are query-only
service/matching/matcher.go
Outdated
for range tm.config.ForwarderMaxOutstandingTasks() { | ||
go tm.forwardTasks(lim, retrier) | ||
} | ||
for range tm.config.ForwarderMaxOutstandingPolls() { | ||
go tm.forwardPolls() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to think if we could auto adjust those, having too many configs is a problem that we don't know which one to tweak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These configs have been here forever (and afaik have never been adjusted)
} | ||
} | ||
|
||
func bugIf(cond bool, msg string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we panic in prod, or only panic in test? Making too stringent check and panic could hurt prod reliability for small bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't plan to leave the panics in prod
service/matching/matcher.go
Outdated
var cancel context.CancelFunc | ||
if task.forwardCtx != nil { | ||
// Use sync match context if we have it (for deadline, headers, etc.) | ||
// TODO(pri): does it make sense to subtract 1s from the context deadline here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I think it is the final matcher (that waits on task arrival) that needs to substract 1s from context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean that waits for poll arrival? Maybe that makes more sense. Although in theory, the time it takes for a match to propagate from a parent down to a distant child after n hops is linear in n, so it sort of makes sense to subtract something on each hop
service/matching/matcher.go
Outdated
task *internalTask | ||
poller *waitingPoller | ||
ctxErr error // set if context timed out/canceled | ||
ctxErrIdx int // index of context that closed first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there array of context? what is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See Enqueue{Task,Poller}AndWait
Not that I want you to spend a lot of time on this, but probably a bunch of questions are answered in the walkthrough recording. The final version will have more high-level architecture comments, once all the structure is settled.
(Although writing long comments is dangerous since they inevitably get out of date and no one fixes them)
} else if task.isPollForwarder && poller.forwardCtx == nil { | ||
continue | ||
} else if poller.isTaskForwarder && !allowForwarding { | ||
continue | ||
} else if poller.isTaskValidator && task.forwardCtx != nil { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is task.isPollForwarder and poller.isTaskValidator mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're the ones that do poll/task forwarding. An architecture comment or doc will explain how it works.
service/matching/matcher_data.go
Outdated
// limit doesn't allow the task to be matched yet. | ||
// call with lock held | ||
func (d *matcherData) findMatch(allowForwarding bool) (*internalTask, *waitingPoller) { | ||
// FIXME: optimize so it's not O(d*n) worst case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally, this should be O(logN)+O(logM). N, M are the size of tasks and pollers, and it is just pick the top item from both heap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is just picking so it's O(1) in the common case, and yeah, O(log n + log m) to remove them after match. But in general there won't be many pollers and many tasks sitting around at the same time, there'll be tasks that immediately get matched with a poller, or pollers that get immediately matched with a task.
The bad behavior is only when you have a backlog of tasks, and also a lot of query-only pollers. We can handle that with some additional bookkeeping for query-only pollers and query tasks.
lastPoller time.Time // most recent poll start time | ||
} | ||
|
||
type pollerPQ struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need heap for the pollers? All pollers are the same. Can a FIFO array working for pollers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and no.. regular pollers are the same, but the "task forwarder" is a poller in this architecture and it should be prioritized last. Of course, we can do similar logic with a simple array, just by keeping it on the side. This was just the easiest way to write it at first.
service/matching/task_reader.go
Outdated
retrier backoff.Retrier | ||
loadedTasks atomic.Int64 | ||
|
||
backlogAgeLock sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this lock to inside of backlogAgeTracker, and let backlogAgeTracker decide when to lock/unlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want locking to be managed by the owner of backlogAgeTracker, it should be a pure data structure.
I'd actually like to remove even more locks from matching, we don't need such fine-grained locking, we get plenty of concurrency from many task queues in the same process.
service/matching/task_reader.go
Outdated
} | ||
|
||
func (tr *taskReader) getTasksPump(ctx context.Context) error { | ||
ctx = tr.backlogMgr.contextInfoProvider(ctx) | ||
tr.readerCtx = ctx | ||
|
||
if err := tr.backlogMgr.WaitUntilInitialized(ctx); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a risk that between the signal was send and this line of check, the config.GetTasksBatchSize() changed that this check become true without actually any new task loading, and due to that we only send one signal when it cross the the check, we may end up with miss a task load and not able to recover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well... currently we signal every minute anyway. though I want to remove that (eventually).
but yeah, in theory that could happen. that's one of the things we have to be careful of when we remove the periodic signal.
also, this reload condition needs to change for fairness, so the related code will definitely be improved.
- Rewrite TaskMatcher to use explicit priority queues, added as priTaskMatcher. - Changed taskReader to push all tasks into priTaskMatcher, and loads more when the number outstanding is too low. Added as priTaskReader. - Modified Forwarder to work with priTaskMatcher, added as priForwarder. - Config switch to use old or new matcher, and various small changes to support that. Functional tests: This passes all functional tests, but currently only versioning functional tests flip the switch to use the new matcher (they exercise matching the most). Unit tests: Tests for new matcher are not there yet. Other tests will need to be modified. - Ability to prioritize uniformly across all pending tasks including queries+nexus. - Simpler architecture, no new component between taskReader and TaskMatcher. - More separated concerns, e.g. "forwarding" is (mostly) in one place instead of spread around, "rate limiting" is in one place, etc. - Easier to understand code, no more nested selects (this is subjective of course). - Some behavior improvements, e.g. forwarded backlog tasks don't bounce back and forth anymore. - Maybe better performance (after optimizations). existing functional tests, need to add/update unit tests lots of new code, may be new bugs
- Rewrite TaskMatcher to use explicit priority queues, added as priTaskMatcher. - Changed taskReader to push all tasks into priTaskMatcher, and loads more when the number outstanding is too low. Added as priTaskReader. - Modified Forwarder to work with priTaskMatcher, added as priForwarder. - Config switch to use old or new matcher, and various small changes to support that. Functional tests: This passes all functional tests, but currently only versioning functional tests flip the switch to use the new matcher (they exercise matching the most). Unit tests: Tests for new matcher are not there yet. Other tests will need to be modified. - Ability to prioritize uniformly across all pending tasks including queries+nexus. - Simpler architecture, no new component between taskReader and TaskMatcher. - More separated concerns, e.g. "forwarding" is (mostly) in one place instead of spread around, "rate limiting" is in one place, etc. - Easier to understand code, no more nested selects (this is subjective of course). - Some behavior improvements, e.g. forwarded backlog tasks don't bounce back and forth anymore. - Maybe better performance (after optimizations). existing functional tests, need to add/update unit tests lots of new code, may be new bugs
What changed?
Functional tests: This passes all functional tests, but currently only versioning functional tests flip the switch to use the new matcher (they exercise matching the most).
Unit tests: Tests for new matcher are not there yet. Other tests will need to be modified.
Why?
How did you test it?
existing functional tests, need to add/update unit tests
Potential risks
lots of new code, may be new bugs