Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing error on deletes from owning table for a lookup being populated by CreateLookupVindex after a SwitchWrites #8701

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"sort"
"time"

"vitess.io/vitess/go/vt/vtgate/evalengine"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -232,16 +230,6 @@ func (upd *Update) updateVindexEntries(vcursor VCursor, bindVars map[string]*que
for _, colVindex := range upd.Table.Owned {
// Update columns only if they're being changed.
if updColValues, ok := upd.ChangedVindexValues[colVindex.Name]; ok {
offset := updColValues.Offset
if !row[offset].IsNull() {
val, err := evalengine.ToInt64(row[offset])
if err != nil {
return err
}
if val == int64(1) { // 1 means that the old and new value are same and vindex update is not required.
continue
}
}
frouioui marked this conversation as resolved.
Show resolved Hide resolved
fromIds := make([]sqltypes.Value, 0, len(colVindex.Columns))
var vindexColumnKeys []sqltypes.Value
for _, vCol := range colVindex.Columns {
Expand Down
178 changes: 178 additions & 0 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,184 @@ func TestDeleteScatter(t *testing.T) {
}
}

func TestUpdateEqualWithWriteOnlyLookupUniqueVindex(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|lu_col|lu_col", "int64|int64|int64|int64"),
"1|2|1|1",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)

_, err := executorExec(executor, "update t2_wo_lookup set lu_col = 5 where wo_lu_col = 2", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, lu_col, lu_col = 5 from t2_wo_lookup where wo_lu_col = 2 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_wo_lookup set lu_col = 5 where wo_lu_col = 2",
BindVariables: map[string]*querypb.BindVariable{},
}}

utils.MustMatch(t, wantQueries, sbc1.Queries)
utils.MustMatch(t, wantQueries, sbc2.Queries)

bq1 := &querypb.BoundQuery{
Sql: "delete from lu_idx where lu_col = :lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": sqltypes.Uint64BindVariable(1),
"lu_col": sqltypes.Int64BindVariable(1),
},
}
bq2 := &querypb.BoundQuery{
Sql: "insert into lu_idx(lu_col, keyspace_id) values (:lu_col_0, :keyspace_id_0)",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id_0": sqltypes.Uint64BindVariable(1),
"lu_col_0": sqltypes.Int64BindVariable(5),
},
}
lookWant := []*querypb.BoundQuery{bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2}
utils.MustMatch(t, lookWant, sbcLookup.Queries)
}

func TestUpdateEqualWithMultipleLookupVindex(t *testing.T) {
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, nil)

sbcLookup.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("lu_col|keyspace_id", "int64|varbinary"),
"1|1",
)})

sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|lu_col|lu_col", "int64|int64|int64|int64"),
"1|2|1|1",
)})

_, err := executorExec(executor, "update t2_wo_lookup set lu_col = 5 where wo_lu_col = 2 and lu_col = 1", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, lu_col, lu_col = 5 from t2_wo_lookup where wo_lu_col = 2 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update t2_wo_lookup set lu_col = 5 where wo_lu_col = 2 and lu_col = 1",
BindVariables: map[string]*querypb.BindVariable{},
}}

vars, _ := sqltypes.BuildBindVariable([]interface{}{
sqltypes.NewInt64(1),
})
lookWant := []*querypb.BoundQuery{{
Sql: "select lu_col, keyspace_id from lu_idx where lu_col in ::lu_col for update",
BindVariables: map[string]*querypb.BindVariable{
"lu_col": vars,
},
}, {
Sql: "delete from lu_idx where lu_col = :lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": sqltypes.Uint64BindVariable(1),
"lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "insert into lu_idx(lu_col, keyspace_id) values (:lu_col_0, :keyspace_id_0)",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id_0": sqltypes.Uint64BindVariable(1),
"lu_col_0": sqltypes.Int64BindVariable(5),
},
}}
utils.MustMatch(t, lookWant, sbcLookup.Queries)
utils.MustMatch(t, wantQueries, sbc1.Queries)
assert.Nil(t, sbc2.Queries)
}

func TestDeleteEqualWithWriteOnlyLookupUniqueVindex(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|lu_col", "int64|int64|int64"),
"1|1|1",
)}
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, res)

_, err := executorExec(executor, "delete from t2_wo_lookup where wo_lu_col = 1", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, lu_col from t2_wo_lookup where wo_lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "delete from t2_wo_lookup where wo_lu_col = 1",
BindVariables: map[string]*querypb.BindVariable{},
}}

bq1 := &querypb.BoundQuery{
Sql: "delete from wo_lu_idx where wo_lu_col = :wo_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"wo_lu_col": sqltypes.Int64BindVariable(1),
},
}
bq2 := &querypb.BoundQuery{
Sql: "delete from lu_idx where lu_col = :lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": sqltypes.Uint64BindVariable(1),
"lu_col": sqltypes.Int64BindVariable(1),
},
}
lookWant := []*querypb.BoundQuery{bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2, bq1, bq2}
utils.MustMatch(t, lookWant, sbcLookup.Queries)
utils.MustMatch(t, wantQueries, sbc1.Queries)
utils.MustMatch(t, wantQueries, sbc2.Queries)
}

func TestDeleteEqualWithMultipleLookupVindex(t *testing.T) {
executor, sbc1, sbc2, sbcLookup := createCustomExecutorSetValues(executorVSchema, nil)

sbcLookup.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("lu_col|keyspace_id", "int64|varbinary"),
"1|1",
)})

sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|lu_col", "int64|int64|int64"),
"1|1|1",
)})

_, err := executorExec(executor, "delete from t2_wo_lookup where wo_lu_col = 1 and lu_col = 1", nil)
require.NoError(t, err)
wantQueries := []*querypb.BoundQuery{
{
Sql: "select id, wo_lu_col, lu_col from t2_wo_lookup where wo_lu_col = 1 and lu_col = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "delete from t2_wo_lookup where wo_lu_col = 1 and lu_col = 1",
BindVariables: map[string]*querypb.BindVariable{},
}}

vars, _ := sqltypes.BuildBindVariable([]interface{}{
sqltypes.NewInt64(1),
})
lookWant := []*querypb.BoundQuery{{
Sql: "select lu_col, keyspace_id from lu_idx where lu_col in ::lu_col for update",
BindVariables: map[string]*querypb.BindVariable{
"lu_col": vars,
},
}, {
Sql: "delete from wo_lu_idx where wo_lu_col = :wo_lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": {Type: querypb.Type_VARBINARY, Value: []byte("\x16k@\xb4J\xbaK\xd6")},
"wo_lu_col": sqltypes.Int64BindVariable(1),
},
}, {
Sql: "delete from lu_idx where lu_col = :lu_col and keyspace_id = :keyspace_id",
BindVariables: map[string]*querypb.BindVariable{
"keyspace_id": sqltypes.Uint64BindVariable(1),
"lu_col": {Type: querypb.Type_INT64, Value: []byte("1")},
},
}}
utils.MustMatch(t, lookWant, sbcLookup.Queries)

utils.MustMatch(t, wantQueries, sbc1.Queries)
assert.Nil(t, sbc2.Queries)
}

func TestDeleteByDestination(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
// This query is not supported in v3, so we know for sure is taking the DeleteByDestination route
Expand Down
66 changes: 64 additions & 2 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,26 @@ var executorVSchema = `
"from": "unq_col",
"to": "keyspace_id"
},
"owner": "t1"
"owner": "t1"
},
"t2_wo_lu_vdx": {
"type": "lookup_unique",
"params": {
"table": "TestUnsharded.wo_lu_idx",
"from": "wo_lu_col",
"to": "keyspace_id",
"write_only": "true"
},
"owner": "t2_wo_lookup"
},
"t2_lu_vdx": {
"type": "lookup_hash_unique",
"params": {
"table": "TestUnsharded.lu_idx",
"from": "lu_col",
"to": "keyspace_id"
},
"owner": "t2_wo_lookup"
}
},
"tables": {
Expand Down Expand Up @@ -268,7 +287,23 @@ var executorVSchema = `
"name": "hash_index"
}
]
}
},
"t2_wo_lookup": {
"column_vindexes": [
{
"column": "id",
"name": "hash_index"
},
{
"column": "wo_lu_col",
"name": "t2_wo_lu_vdx"
},
{
"column": "lu_col",
"name": "t2_lu_vdx"
}
]
}
}
}
`
Expand Down Expand Up @@ -300,6 +335,8 @@ var unshardedVSchema = `
"sequence": "user_seq"
}
},
"wo_lu_idx": {},
"lu_idx": {},
"simple": {}
}
}
Expand Down Expand Up @@ -461,6 +498,31 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo
return executor, sbc1, sbc2, sbclookup
}

func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) {
cell := "aa"
hc := discovery.NewFakeLegacyHealthCheck()
s := createSandbox("TestExecutor")
s.VSchema = vschema
serv := newSandboxForCells([]string{cell})
resolver := newTestLegacyResolver(hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
sbcs := []*sandboxconn.SandboxConn{}
for _, shard := range shards {
sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
if values != nil {
sbc.SetResults(values)
}
sbcs = append(sbcs, sbc)
}

createSandbox(KsTestUnsharded)
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false)
return executor, sbcs[0], sbcs[1], sbclookup
}

func executorExecSession(executor *Executor, sql string, bv map[string]*querypb.BindVariable, session *vtgatepb.Session) (*sqltypes.Result, error) {
return executor.Execute(
context.Background(),
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ func TestExecutorShow(t *testing.T) {
buildVarCharRow("TestExecutor", "name_lastname_keyspace_id_map", "lookup", "from=name,lastname; table=name_lastname_keyspace_id_map; to=keyspace_id", "user2"),
buildVarCharRow("TestExecutor", "name_user_map", "lookup_hash", "from=name; table=name_user_map; to=user_id", "user"),
buildVarCharRow("TestExecutor", "t1_lkp_vdx", "consistent_lookup_unique", "from=unq_col; table=t1_lkp_idx; to=keyspace_id", "t1"),
buildVarCharRow("TestExecutor", "t2_wo_lu_vdx", "lookup_unique", "from=wo_lu_col; table=wo_lu_idx; to=keyspace_id; write_only=true", "t2_wo_lookup"),
},
}
utils.MustMatch(t, wantqr, qr, query)
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func getDMLRouting(where *sqlparser.Where, table *vindexes.Table) (engine.DMLOpc
opcode := engine.Equal
if pv.IsList() {
opcode = engine.In
} else if lu, isLu := single.(vindexes.LookupBackfill); isLu && lu.IsBackfilling() {
// Checking if the Vindex is currently backfilling or not, if it isn't we can read from the vindex table
// and we will be able to do a delete equal. Otherwise, we continue to look for next best vindex.
continue
}
return opcode, ksidVindex, ksidCol, single, []sqltypes.PlanValue{pv}, nil
}
Expand Down
Loading