Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up Primitive interface implementations #8901

Merged
merged 4 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (c *Concatenate) Inputs() []Primitive {
return c.Sources
}

func (c *Concatenate) Description() PrimitiveDescription {
func (c *Concatenate) description() PrimitiveDescription {
return PrimitiveDescription{OperatorType: c.RouteType()}
}

Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (c *DBDDL) GetTableName() string {
return ""
}

// Execute implements the Primitive interface
// TryExecute implements the Primitive interface
func (c *DBDDL) TryExecute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
name := vcursor.GetDBDDLPluginName()
plugin, ok := databaseCreatorPlugins[name]
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *DBDDL) dropDatabase(vcursor VCursor, plugin DBDDLPlugin) (*sqltypes.Res
return &sqltypes.Result{StatusFlags: sqltypes.ServerStatusDbDropped}, nil
}

// StreamExecute implements the Primitive interface
// TryStreamExecute implements the Primitive interface
func (c *DBDDL) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := c.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
Expand All @@ -195,7 +195,7 @@ func (c *DBDDL) GetFields(VCursor, map[string]*querypb.BindVariable) (*sqltypes.
}

// Description implements the Primitive interface
systay marked this conversation as resolved.
Show resolved Hide resolved
func (c *DBDDL) Description() PrimitiveDescription {
func (c *DBDDL) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: strings.ToUpper(c.RouteType()),
Keyspace: &vindexes.Keyspace{Name: c.name},
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type DDL struct {
noInputs
}

func (ddl *DDL) Description() PrimitiveDescription {
func (ddl *DDL) description() PrimitiveDescription {
other := map[string]interface{}{
"Query": ddl.SQL,
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func (ddl *DDL) isOnlineSchemaDDL() bool {
return false
}

// Execute implements the Primitive interface
// TryExecute implements the Primitive interface
func (ddl *DDL) TryExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
if ddl.CreateTempTable {
vcursor.Session().HasCreatedTempTable()
Expand Down Expand Up @@ -113,7 +113,7 @@ func (ddl *DDL) TryExecute(vcursor VCursor, bindVars map[string]*query.BindVaria
}
}

// StreamExecute implements the Primitive interface
// TryStreamExecute implements the Primitive interface
func (ddl *DDL) TryStreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := ddl.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (del *Delete) GetTableName() string {
return ""
}

// Execute performs a non-streaming exec.
// TryExecute performs a non-streaming exec.
func (del *Delete) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
if del.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond)
Expand All @@ -92,7 +92,7 @@ func (del *Delete) TryExecute(vcursor VCursor, bindVars map[string]*querypb.Bind
}
}

// StreamExecute performs a streaming exec.
// TryStreamExecute performs a streaming exec.
func (del *Delete) TryStreamExecute(VCursor, map[string]*querypb.BindVariable, bool, func(*sqltypes.Result) error) error {
return fmt.Errorf("query %q cannot be used for streaming", del.Query)
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func (del *Delete) deleteVindexEntries(vcursor VCursor, bindVars map[string]*que
return nil
}

func (del *Delete) Description() PrimitiveDescription {
func (del *Delete) description() PrimitiveDescription {
other := map[string]interface{}{
"Query": del.Query,
"Table": del.GetTableName(),
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newProbeTable() *probeTable {
return &probeTable{m: map[int64][]row{}}
}

// Execute implements the Primitive interface
// TryExecute implements the Primitive interface
func (d *Distinct) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
input, err := vcursor.ExecutePrimitive(d.Source, bindVars, wantfields)
if err != nil {
Expand All @@ -115,7 +115,7 @@ func (d *Distinct) TryExecute(vcursor VCursor, bindVars map[string]*querypb.Bind
return result, err
}

// StreamExecute implements the Primitive interface
// TryStreamExecute implements the Primitive interface
func (d *Distinct) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
pt := newProbeTable()

Expand Down Expand Up @@ -169,7 +169,7 @@ func (d *Distinct) Inputs() []Primitive {
return []Primitive{d.Source}
}

func (d *Distinct) Description() PrimitiveDescription {
func (d *Distinct) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "Distinct",
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/fake_primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,6 @@ func wrapStreamExecute(prim Primitive, vcursor VCursor, bindVars map[string]*que
return result, err
}

func (f *fakePrimitive) Description() PrimitiveDescription {
func (f *fakePrimitive) description() PrimitiveDescription {
return PrimitiveDescription{OperatorType: "fake"}
}
7 changes: 4 additions & 3 deletions go/vt/vtgate/engine/gen4_compare_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ func (gc *Gen4CompareV3) Inputs() []Primitive {
}

// Description implements the Primitive interface
func (gc *Gen4CompareV3) Description() PrimitiveDescription {
return gc.Gen4.Description()
func (gc *Gen4CompareV3) description() PrimitiveDescription {
return PrimitiveDescription{OperatorType: "Gen4CompareV3"}
}

func CompareV3AndGen4Errors(v3Err error, gen4Err error) error {
// CompareV3AndGen4Errors compares the two errors, and if they don't match, produces an error
func CompareV3AndGen4Errors(v3Err, gen4Err error) error {
if v3Err != nil && gen4Err != nil {
if v3Err.Error() == gen4Err.Error() {
return gen4Err
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ func (ins *Insert) GetTableName() string {
return ""
}

// Execute performs a non-streaming exec.
func (ins *Insert) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
// TryExecute performs a non-streaming exec.
func (ins *Insert) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
if ins.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(ins.QueryTimeout) * time.Millisecond)
defer cancel()
Expand All @@ -205,13 +205,13 @@ func (ins *Insert) TryExecute(vcursor VCursor, bindVars map[string]*querypb.Bind
}
}

// StreamExecute performs a streaming exec.
func (ins *Insert) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
// TryStreamExecute performs a streaming exec.
func (ins *Insert) TryStreamExecute(VCursor, map[string]*querypb.BindVariable, bool, func(*sqltypes.Result) error) error {
return fmt.Errorf("query %q cannot be used for streaming", ins.Query)
}

// GetFields fetches the field info.
func (ins *Insert) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
func (ins *Insert) GetFields(VCursor, map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unreachable code for %q", ins.Query)
}

Expand Down Expand Up @@ -615,7 +615,7 @@ func InsertVarName(col sqlparser.ColIdent, rowNum int) string {
return fmt.Sprintf("_%s_%d", col.CompliantName(), rowNum)
}

func (ins *Insert) Description() PrimitiveDescription {
func (ins *Insert) description() PrimitiveDescription {
other := map[string]interface{}{
"Query": ins.Query,
"TableName": ins.GetTableName(),
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Join struct {
Vars map[string]int `json:",omitempty"`
}

// Execute performs a non-streaming exec.
// TryExecute performs a non-streaming exec.
func (jn *Join) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
joinVars := make(map[string]*querypb.BindVariable)
lresult, err := vcursor.ExecutePrimitive(jn.Left, bindVars, wantfields)
Expand Down Expand Up @@ -92,7 +92,7 @@ func (jn *Join) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVar
return result, nil
}

// StreamExecute performs a streaming exec.
// TryStreamExecute performs a streaming exec.
func (jn *Join) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
joinVars := make(map[string]*querypb.BindVariable)
err := vcursor.StreamExecutePrimitive(jn.Left, bindVars, wantfields, func(lresult *sqltypes.Result) error {
Expand Down Expand Up @@ -257,7 +257,7 @@ func combineVars(bv1, bv2 map[string]*querypb.BindVariable) map[string]*querypb.
return out
}

func (jn *Join) Description() PrimitiveDescription {
func (jn *Join) description() PrimitiveDescription {
other := map[string]interface{}{
"TableName": jn.GetTableName(),
"JoinColumnIndexes": strings.Trim(strings.Join(strings.Fields(fmt.Sprint(jn.Cols)), ","), "[]"),
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (l *Limit) GetTableName() string {
return l.Input.GetTableName()
}

// Execute satisfies the Primtive interface.
// TryExecute satisfies the Primitive interface.
func (l *Limit) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
count, err := l.fetchCount(bindVars)
if err != nil {
Expand Down Expand Up @@ -85,7 +85,7 @@ func (l *Limit) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVar
return result, nil
}

// StreamExecute satisfies the Primtive interface.
// TryStreamExecute satisfies the Primitive interface.
func (l *Limit) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
count, err := l.fetchCount(bindVars)
if err != nil {
Expand Down Expand Up @@ -206,7 +206,7 @@ func (l *Limit) fetchOffset(bindVars map[string]*querypb.BindVariable) (int, err
return offset, nil
}

func (l *Limit) Description() PrimitiveDescription {
func (l *Limit) description() PrimitiveDescription {
other := map[string]interface{}{}

if !l.Count.IsNull() {
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (l *Lock) GetTableName() string {
return "dual"
}

// Execute is part of the Primitive interface
// TryExecute is part of the Primitive interface
func (l *Lock) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(l.Keyspace.Name, nil, []key.Destination{l.TargetDestination})
if err != nil {
Expand All @@ -75,7 +75,7 @@ func (l *Lock) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVari
return vcursor.ExecuteLock(rss[0], query)
}

// StreamExecute is part of the Primitive interface
// TryStreamExecute is part of the Primitive interface
func (l *Lock) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
qr, err := l.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
Expand All @@ -89,7 +89,7 @@ func (l *Lock) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVaria
return nil, vterrors.New(vtrpc.Code_UNIMPLEMENTED, "not implements in lock primitive")
}

func (l *Lock) Description() PrimitiveDescription {
func (l *Lock) description() PrimitiveDescription {
other := map[string]interface{}{
"Query": l.Query,
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/memory_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (ms *MemorySort) SetTruncateColumnCount(count int) {
ms.TruncateColumnCount = count
}

// Execute satisfies the Primitive interface.
// TryExecute satisfies the Primitive interface.
func (ms *MemorySort) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
count, err := ms.fetchCount(bindVars)
if err != nil {
Expand All @@ -90,7 +90,7 @@ func (ms *MemorySort) TryExecute(vcursor VCursor, bindVars map[string]*querypb.B
return result.Truncate(ms.TruncateColumnCount), nil
}

// StreamExecute satisfies the Primitive interface.
// TryStreamExecute satisfies the Primitive interface.
func (ms *MemorySort) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
count, err := ms.fetchCount(bindVars)
if err != nil {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (ms *MemorySort) fetchCount(bindVars map[string]*querypb.BindVariable) (int
return count, nil
}

func (ms *MemorySort) Description() PrimitiveDescription {
func (ms *MemorySort) description() PrimitiveDescription {
orderByIndexes := GenericJoin(ms.OrderBy, orderByParamsToString)
value := ms.UpperLimit.Value
other := map[string]interface{}{"OrderBy": orderByIndexes}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (ms *MergeSort) GetKeyspaceName() string { return "" }
// GetTableName satisfies Primitive.
func (ms *MergeSort) GetTableName() string { return "" }

// Execute is not supported.
// TryExecute is not supported.
func (ms *MergeSort) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] Execute is not reachable")
}
Expand All @@ -74,7 +74,7 @@ func (ms *MergeSort) GetFields(vcursor VCursor, bindVars map[string]*querypb.Bin
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] GetFields is not reachable")
}

// StreamExecute performs a streaming exec.
// TryStreamExecute performs a streaming exec.
func (ms *MergeSort) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
ctx, cancel := context.WithCancel(vcursor.Context())
defer cancel()
Expand Down Expand Up @@ -208,7 +208,7 @@ func (ms *MergeSort) getStreamingFields(handles []*streamHandle, callback func(*
return nil
}

func (ms *MergeSort) Description() PrimitiveDescription {
func (ms *MergeSort) description() PrimitiveDescription {
other := map[string]interface{}{
"OrderBy": ms.OrderBy,
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/mstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func (m *MStream) GetTableName() string {
return m.TableName
}

// Execute implements the Primitive interface
// TryExecute implements the Primitive interface
func (m *MStream) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] 'Execute' called for Stream")
}

// StreamExecute implements the Primitive interface
// TryStreamExecute implements the Primitive interface
func (m *MStream) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
rss, _, err := vcursor.ResolveDestinations(m.Keyspace.Name, nil, []key.Destination{m.TargetDestination})
if err != nil {
Expand All @@ -77,7 +77,7 @@ func (m *MStream) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVa
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] 'GetFields' called for Stream")
}

func (m *MStream) Description() PrimitiveDescription {
func (m *MStream) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "MStream",
Keyspace: m.Keyspace,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type OnlineDDL struct {
noInputs
}

func (v *OnlineDDL) Description() PrimitiveDescription {
func (v *OnlineDDL) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "OnlineDDL",
Keyspace: v.Keyspace,
Expand All @@ -71,7 +71,7 @@ func (v *OnlineDDL) GetTableName() string {
return v.DDL.GetTable().Name.String()
}

// Execute implements the Primitive interface
// TryExecute implements the Primitive interface
func (v *OnlineDDL) TryExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
result = &sqltypes.Result{
Fields: []*querypb.Field{
Expand Down Expand Up @@ -114,7 +114,7 @@ func (v *OnlineDDL) TryExecute(vcursor VCursor, bindVars map[string]*query.BindV
return result, err
}

//StreamExecute implements the Primitive interface
// TryStreamExecute implements the Primitive interface
func (v *OnlineDDL) TryStreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := v.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (oa *OrderedAggregate) execute(vcursor VCursor, bindVars map[string]*queryp
return out, nil
}

// StreamExecute is a Primitive function.
// TryStreamExecute is a Primitive function.
func (oa *OrderedAggregate) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var current []sqltypes.Value
var curDistincts []sqltypes.Value
Expand Down Expand Up @@ -505,7 +505,7 @@ func groupByParamsToString(i interface{}) string {
return i.(*GroupByParams).String()
}

func (oa *OrderedAggregate) Description() PrimitiveDescription {
func (oa *OrderedAggregate) description() PrimitiveDescription {
aggregates := GenericJoin(oa.Aggregates, aggregateParamsToString)
groupBy := GenericJoin(oa.GroupByKeys, groupByParamsToString)
other := map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/plan_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func marshalAdd(prepend string, buf *bytes.Buffer, name string, obj interface{})

// PrimitiveToPlanDescription transforms a primitive tree into a corresponding PlanDescription tree
func PrimitiveToPlanDescription(in Primitive) PrimitiveDescription {
this := in.Description()
this := in.description()

for _, input := range in.Inputs() {
this.Inputs = append(this.Inputs, PrimitiveToPlanDescription(input))
Expand Down
Loading