-
Notifications
You must be signed in to change notification settings - Fork 357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: avoid payload limitation [MD-266] #9164
Conversation
✅ Deploy Preview for determined-ui canceled.
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #9164 +/- ##
==========================================
- Coverage 48.98% 48.98% -0.01%
==========================================
Files 1234 1234
Lines 159823 159886 +63
Branches 2780 2780
==========================================
+ Hits 78291 78316 +25
- Misses 81357 81395 +38
Partials 175 175
Flags with carried forward coverage won't be shown. Click here to find out more.
|
fd9a44b
to
415e428
Compare
116171b
to
49f3554
Compare
49f3554
to
5b77eec
Compare
@@ -0,0 +1,78 @@ | |||
CREATE OR REPLACE FUNCTION stream_project_change() RETURNS TRIGGER AS $$ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, Nick has moved all sql functions out of migration files. I will update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rebase so i can get a diff instead (very excited for this change)
@@ -0,0 +1,78 @@ | |||
CREATE OR REPLACE FUNCTION stream_project_change() RETURNS TRIGGER AS $$ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rebase so i can get a diff instead (very excited for this change)
5b77eec
to
d664ee3
Compare
d664ee3
to
7ba0b61
Compare
master/pkg/stream/stream.go
Outdated
} | ||
|
||
// Publisher is responsible for publishing messages of type T | ||
// to streamers associate with active subscriptions. | ||
type Publisher[T Msg] struct { | ||
Lock sync.Mutex | ||
Subscriptions map[*Subscription[T]]struct{} | ||
Subscriptions *list.List |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change this back to map. The previous hydration(hydrate message while broadcasting) requires the knowledge of the subscription order for testing purposes. If the broadcast order to users changes, the output message will also change. This is no longer an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you going to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
7ba0b61
to
2f26cec
Compare
// its id. | ||
func ModelVersionMakeHydrator() func(*ModelVersionMsg) (*ModelVersionMsg, error) { | ||
return func(msg *ModelVersionMsg) (*ModelVersionMsg, error) { | ||
var modelVersionMsg ModelVersionMsg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for such a short lived var, use a shorter name
var modelVersionMsg ModelVersionMsg | |
var msg ModelVersionMsg |
but we can't call it msg
since it exists. so, why not just query into the existing msg
? or pass just the id and make a new var msg ModelVersionMsg
if it's important not to mutate for some reason.
master/internal/stream/publisher.go
Outdated
|
||
idToRecordCache := map[int]stream.RecordCache{} | ||
for _, ev := range events { | ||
idToRecordCache = publisher.HydrateMsg(ev.After, idToRecordCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perf: could we hydrate the entire batch of ids?
@@ -171,14 +176,14 @@ func ModelVersionCollectStartupMsgs( | |||
query = modelVersionPermFilterQuery(query, accessScopes) | |||
} | |||
err := query.Scan(ctx, &mvMsgs) | |||
if err != nil && errors.Cause(err) != sql.ErrNoRows { | |||
if errors.Is(err, sql.ErrNoRows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is inverted now. should it be this instead:
if errors.Is(err, sql.ErrNoRows) { | |
if err != nil && !errors.Is(err, sql.ErrNoRows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
query := db.Bun().NewSelect().Model(&modelVersionMsg).Where("id = ?", msg.GetID()).ExcludeColumn("workspace_id") | ||
err := query.Scan(context.Background(), &modelVersionMsg) | ||
if errors.Is(err, sql.ErrNoRows) { | ||
return nil, err | ||
} else if err != nil { | ||
log.Errorf("error in model version hydrator: %v\n", err) | ||
return nil, err | ||
} | ||
modelVersionMsg.WorkspaceID = msg.WorkspaceID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the point of not querying the workspace_id and setting it here?
I think this finally shows an issue with this hydration scheme, what if the queue is backed up and since this event fired the object has been moved and had changes and a user that shouldn't see it will now see the new version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workspace_id
is not in the model_version table. We get it by querying model table: https://github.com/determined-ai/determined/pull/9164/files#diff-b2fad5a40672d2b3108e07a06aa940de18c5a72dc6c22aa35c33c7d9b7d1e501R70, and send it to the backend as part of the raw message. project and model doesn't have this issue because the corresponding tables have workspace_id
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have thought about return func(ID int, workspaceID *int) (*ModelVersionMsg, error) {
, but I'm not sure if it's better than the current version.
master/internal/stream/models.go
Outdated
@@ -163,14 +168,14 @@ func ModelCollectStartupMsgs( | |||
query = permFilterQuery(query, accessScopes) | |||
} | |||
err := query.Scan(ctx, &modelMsgs) | |||
if err != nil && errors.Cause(err) != sql.ErrNoRows { | |||
if errors.Is(err, sql.ErrNoRows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if errors.Is(err, sql.ErrNoRows) { | |
if err != nil && !errors.Is(err, sql.ErrNoRows) { |
master/pkg/stream/stream.go
Outdated
} | ||
|
||
// Publisher is responsible for publishing messages of type T | ||
// to streamers associate with active subscriptions. | ||
type Publisher[T Msg] struct { | ||
Lock sync.Mutex | ||
Subscriptions map[*Subscription[T]]struct{} | ||
Subscriptions *list.List |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you going to change this?
master/pkg/stream/stream.go
Outdated
if errors.Is(err, sql.ErrNoRows) { | ||
// This id has deleted. | ||
idToRecordCache[rawMsg.GetID()] = RecordCache{DeleteMsg: rawMsg.DeleteMsg()} | ||
return idToRecordCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you're going to mutate it don't bother returning it.
master/pkg/stream/stream.go
Outdated
// HydrateMsg queries the DB by the ID from rawMsg of a upsert or fallin event | ||
// and get the full record. | ||
func (p *Publisher[T]) HydrateMsg(rawMsg T, idToRecordCache map[int]RecordCache) map[int]RecordCache { | ||
if reflect.ValueOf(rawMsg).IsNil() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in what case would this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ev.After
could be nil if we are getting a delete event: https://github.com/determined-ai/determined/pull/9164/files#diff-b26a785f68ba6db1891fcaf34b3c7b021cd333dde422e2f22f0983a4d83302a5R438
for _, ev := range events { | ||
var msg interface{} | ||
switch { | ||
case ev.After != nil && sub.filter(*ev.After) && sub.permissionFilter(*ev.After): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to change this logic at all. why can't we just pass events []Event[T]
that are saturated? i was under the impression that is what we were going to do. i expected the diff now to just be a few lines in publisher.go like satured, err := saturateEvents(events)
and then the rest of the code unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This handles the case of :
events = [insert/fallin, delete]
If the last event is a deletion event, we are not able to saturate, so we can't send the message for the insert or fallin. Its ID is stored in userNotKnownIDs
for skipping the deletion event, so the client doesn't have to worry about handling a non-existent deletion.
- It handles the case of:
events = [update, update, update]
We are only able saturated message corresponding to the last update event or an event after the current batch if there is a queue. I use the logic below to avoid sending duplicate messages.
if cachedSeq == afterMsg.SeqNum() {
msg = sub.Streamer.PrepareFn(recordCache.UpsertMsg)
- If events = [update, fallout, update]
Since the saturate message is from the second update, we don't want to send message to the client when we process the first update event. The user shouldn't receive it since it's getting a fallout before the second update event. This is handle by:
if cachedSeq == afterMsg.SeqNum() {
msg = sub.Streamer.PrepareFn(recordCache.UpsertMsg)
- If events = [fallin/insert, fallout]
The saturate message is from fallout. We don't want to send the message when processing fallin/insert. This is also handle by:
if cachedSeq == afterMsg.SeqNum() {
msg = sub.Streamer.PrepareFn(recordCache.UpsertMsg)
We also don't want to send a deletion message when we process the fallout event. I use userNotKnownIDs
to store its ID during fallin/insert and no deletion message will be send when we process the fallout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we not sending a deletion message when we process fallout? In the past we had treated a fallout as if it was a deletion, is that not the case anymore? My concern with not sending a deletion is the client will still believe it exists when it's no longer in scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not sending deletion message when we process fallout in case 4.
We are not able to saturate fallin/insert, so we can't send the message. Its ID is stored in userNotKnownIDs
for skipping the fallout event, so the client doesn't have to worry about handling a non-existent deletion. Sorry for the confusion. When you are up for a chat, happy to about it. I'm not sure how to make this logic easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I believe I'm following, since the client doesn't know anything about the entity we aren't going to send anything to it; it be odd to send a deletion message to someone about an entity they didn't know existed, especially since there's no way to saturate the entity without exposing information outside of the users permission scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah exactly!
@@ -24,7 +24,7 @@ const ( | |||
// otherwise, returns the MarshallableMsg that the streamer sends. | |||
func testPrepareFunc(i stream.MarshallableMsg) interface{} { | |||
switch msg := i.(type) { | |||
case stream.UpsertMsg: | |||
case *stream.UpsertMsg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, why do we need to change to pointer types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testPrepareFunc
is called in Broadcast()
https://github.com/determined-ai/determined/pull/9164/files#diff-ab55b57cea1409ab6f97b1352891adb036d83ee04a82f8428eee733d70351cf6R247:
msg = sub.Streamer.PrepareFn(recordCache.UpsertMsg)
The type of recordCache.UpsertMsg
is *stream.UpsertMsg
.
8925027
to
cb878c9
Compare
@@ -171,14 +176,14 @@ func ModelVersionCollectStartupMsgs( | |||
query = modelVersionPermFilterQuery(query, accessScopes) | |||
} | |||
err := query.Scan(ctx, &mvMsgs) | |||
if err != nil && errors.Cause(err) != sql.ErrNoRows { | |||
if err != nil && !errors.Is(err, sql.ErrNoRows) { | |||
log.Errorf("error: %v\n", err) | |||
return nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is there a reason we aren't wrapping this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will wrap it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I think the performance improvement comment for batching hydration is a valid point, the IO tradeoff vs the application logic to handle deletion events can get pretty substantial in higher volume settings.
7bda69d
to
51ce188
Compare
60c5746
to
1732752
Compare
Ticket
MD-266
Description
Test Plan
Pass CI tests.
Checklist
docs/release-notes/
.See Release Note for details.