Skip to content

Commit

Permalink
VStreamer: recompute table plan if a new table is encountered for the…
Browse files Browse the repository at this point in the history
… same id (#9978)

* Recompute the plan for a table if a new table is encountered for the same id

Signed-off-by: Rohit Nayak <[email protected]>

* Trigger rebuild

Signed-off-by: Rohit Nayak <[email protected]>

* Trigger rebuild

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Mar 26, 2022
1 parent 38ff56b commit b100e7c
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,14 +601,23 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
// will generate a new plan and FIELD event.
id := ev.TableID(vs.format)

if _, ok := vs.plans[id]; ok {
return nil, nil
}

tm, err := ev.TableMap(vs.format)
if err != nil {
return nil, err
}

if plan, ok := vs.plans[id]; ok {
// When the underlying mysql server restarts the table map can change.
// Usually the vstreamer will also error out when this happens, and vstreamer re-initializes its table map.
// But if the vstreamer is not aware of the restart, we could get an id that matches one in the cache, but
// is for a different table. We then invalidate and recompute the plan for this id.
if plan == nil || plan.Table.Name == tm.Name {
return nil, nil
}
vs.plans[id] = nil
log.Infof("table map changed: id %d for %s has changed to %s", id, plan.Table.Name, tm.Name)
}

if tm.Database == "_vt" && tm.Name == "resharding_journal" {
// A journal is a special case that generates a JOURNAL event.
return nil, vs.buildJournalPlan(id, tm)
Expand Down

0 comments on commit b100e7c

Please sign in to comment.