Skip to content

Commit

Permalink
Merge pull request #9543 from planetscale/refactor-routing
Browse files Browse the repository at this point in the history
Refactor routing primitive
  • Loading branch information
harshit-gangal authored Jan 22, 2022
2 parents 9e529b3 + d52155e commit 3013b52
Show file tree
Hide file tree
Showing 48 changed files with 2,481 additions and 2,715 deletions.
91 changes: 43 additions & 48 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

164 changes: 20 additions & 144 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,25 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var _ Primitive = (*Delete)(nil)

// Delete represents the instructions to perform a delete.
type Delete struct {
DML
*DML

// Delete does not take inputs
noInputs
}

var delName = map[DMLOpcode]string{
Unsharded: "DeleteUnsharded",
Equal: "DeleteEqual",
In: "DeleteIn",
Scatter: "DeleteScatter",
ByDestination: "DeleteByDestination",
}

// RouteType returns a description of the query routing type used by the primitive
func (del *Delete) RouteType() string {
return delName[del.Opcode]
return del.Opcode.String()
}

// GetKeyspaceName specifies the Keyspace that this primitive routes to.
Expand All @@ -77,25 +66,25 @@ func (del *Delete) TryExecute(vcursor VCursor, bindVars map[string]*querypb.Bind
defer cancel()
}

rss, _, err := del.findRoute(vcursor, bindVars)
if err != nil {
return nil, err
}
err = allowOnlyPrimary(rss...)
if err != nil {
return nil, err
}

switch del.Opcode {
case Unsharded:
return del.execDeleteUnsharded(vcursor, bindVars)
return del.execUnsharded(vcursor, bindVars, rss)
case Equal:
switch del.Vindex.(type) {
case vindexes.MultiColumn:
return del.execDeleteEqualMultiCol(vcursor, bindVars)
default:
return del.execDeleteEqual(vcursor, bindVars)
}
case In:
return del.execDeleteIn(vcursor, bindVars)
case Scatter:
return del.execDeleteByDestination(vcursor, bindVars, key.DestinationAllShards{})
case ByDestination:
return del.execDeleteByDestination(vcursor, bindVars, del.TargetDestination)
return del.execEqual(vcursor, bindVars, rss, del.deleteVindexEntries)
case IN, Scatter, ByDestination:
return del.execMultiDestination(vcursor, bindVars, rss, del.deleteVindexEntries)
default:
// Unreachable.
return nil, fmt.Errorf("unsupported opcode: %v", del)
return nil, fmt.Errorf("unsupported opcode: %v", del.Opcode)
}
}

Expand All @@ -113,126 +102,13 @@ func (del *Delete) GetFields(VCursor, map[string]*querypb.BindVariable) (*sqltyp
return nil, fmt.Errorf("BUG: unreachable code for %q", del.Query)
}

func (del *Delete) execDeleteUnsharded(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{key.DestinationAllShards{}})
if err != nil {
return nil, err
}
if len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot send query to multiple shards for un-sharded database: %v", rss)
}
err = allowOnlyPrimary(rss...)
if err != nil {
return nil, err
}
return execShard(vcursor, del.Query, bindVars, rss[0], true, true /* canAutocommit */)
}

func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
env := evalengine.EnvWithBindVars(bindVars)
key, err := env.Evaluate(del.Values[0])
if err != nil {
return nil, err
}
rs, ksid, err := resolveSingleShard(vcursor, del.Vindex.(vindexes.SingleColumn), del.Keyspace, key.Value())
if err != nil {
return nil, err
}
err = allowOnlyPrimary(rs)
if err != nil {
return nil, err
}

if len(ksid) == 0 {
return &sqltypes.Result{}, nil
}
if del.OwnedVindexQuery != "" {
err = del.deleteVindexEntries(vcursor, bindVars, []*srvtopo.ResolvedShard{rs})
if err != nil {
return nil, err
}
}
return execShard(vcursor, del.Query, bindVars, rs, true /* rollbackOnError */, true /* canAutocommit */)
}

func (del *Delete) execDeleteEqualMultiCol(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
env := evalengine.EnvWithBindVars(bindVars)
var rowValue []sqltypes.Value
for _, rvalue := range del.Values {
v, err := env.Evaluate(rvalue)
if err != nil {
return nil, err
}
rowValue = append(rowValue, v.Value())
}
rss, _, err := resolveShardsMultiCol(vcursor, del.Vindex.(vindexes.MultiColumn), del.Keyspace, [][]sqltypes.Value{rowValue}, false /* shardIdsNeeded */)
if err != nil {
return nil, err
}
if len(rss) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex mapped id to multi shards: %d", len(rss))
}
err = allowOnlyPrimary(rss...)
if err != nil {
return nil, err
}
if del.OwnedVindexQuery != "" {
err = del.deleteVindexEntries(vcursor, bindVars, rss)
if err != nil {
return nil, err
}
}
return execShard(vcursor, del.Query, bindVars, rss[0], true /* rollbackOnError */, true /* canAutocommit */)
}

func (del *Delete) execDeleteIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
rss, queries, err := resolveMultiValueShards(vcursor, del.Keyspace, del.Query, bindVars, del.Values, del.Vindex)
if err != nil {
return nil, err
}
err = allowOnlyPrimary(rss...)
if err != nil {
return nil, err
}

if del.OwnedVindexQuery != "" {
if err := del.deleteVindexEntries(vcursor, bindVars, rss); err != nil {
return nil, err
}
}
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{dest})
if err != nil {
return nil, err
}
err = allowOnlyPrimary(rss...)
if err != nil {
return nil, err
}

queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: del.Query,
BindVariables: bindVars,
}
}
if len(del.Table.Owned) > 0 {
err = del.deleteVindexEntries(vcursor, bindVars, rss)
if err != nil {
return nil, err
}
}
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

// deleteVindexEntries performs an delete if table owns vindex.
// Note: the commit order may be different from the DML order because it's possible
// for DMLs to reuse existing transactions.
func (del *Delete) deleteVindexEntries(vcursor VCursor, bindVars map[string]*querypb.BindVariable, rss []*srvtopo.ResolvedShard) error {
if del.OwnedVindexQuery == "" {
return nil
}
queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{Sql: del.OwnedVindexQuery, BindVariables: bindVars}
Expand Down Expand Up @@ -290,7 +166,7 @@ func (del *Delete) description() PrimitiveDescription {
}
}

func addFieldsIfNotEmpty(dml DML, other map[string]interface{}) {
func addFieldsIfNotEmpty(dml *DML, other map[string]interface{}) {
if dml.Vindex != nil {
other["Vindex"] = dml.Vindex.String()
}
Expand Down
Loading

0 comments on commit 3013b52

Please sign in to comment.