Skip to content

Commit

Permalink
Merge pull request #7941 from planetscale/planner-olap
Browse files Browse the repository at this point in the history
Plan StreamExecute Queries
  • Loading branch information
harshit-gangal authored Apr 29, 2021
2 parents 5449dd6 + 3165984 commit 9ee1e9c
Show file tree
Hide file tree
Showing 17 changed files with 635 additions and 391 deletions.
8 changes: 6 additions & 2 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func ASTToStatementType(stmt Statement) StatementType {
return StmtFlush
case *CallProc:
return StmtCallProc
case *Stream:
return StmtStream
case *VStream:
return StmtVStream
default:
return StmtUnknown
}
Expand All @@ -117,7 +121,7 @@ func ASTToStatementType(stmt Statement) StatementType {
//CanNormalize takes Statement and returns if the statement can be normalized.
func CanNormalize(stmt Statement) bool {
switch stmt.(type) {
case *Select, *Union, *Insert, *Update, *Delete, *Set, *CallProc: // TODO: we could merge this logic into ASTrewriter
case *Select, *Union, *Insert, *Update, *Delete, *Set, *CallProc, *Stream: // TODO: we could merge this logic into ASTrewriter
return true
}
return false
Expand All @@ -127,7 +131,7 @@ func CanNormalize(stmt Statement) bool {
func CachePlan(stmt Statement) bool {
switch stmt.(type) {
case *Select, *Union, *ParenSelect,
*Insert, *Update, *Delete:
*Insert, *Update, *Delete, *Stream:
return true
}
return false
Expand Down
38 changes: 38 additions & 0 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 14 additions & 10 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,27 @@ package engine

import (
"bytes"
"context"
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"

"vitess.io/vitess/go/test/utils"

"vitess.io/vitess/go/vt/vtgate/vindexes"

"golang.org/x/sync/errgroup"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/sqlparser"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vtgate/vindexes"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

Expand All @@ -57,6 +53,14 @@ type noopVCursor struct {
ctx context.Context
}

func (t *noopVCursor) VStream(rss []*srvtopo.ResolvedShard, filter *binlogdatapb.Filter, gtid string, callback func(evs []*binlogdatapb.VEvent) error) error {
panic("implement me")
}

func (t *noopVCursor) MessageStream(rss []*srvtopo.ResolvedShard, tableName string, callback func(*sqltypes.Result) error) error {
panic("implement me")
}

func (t *noopVCursor) KeyspaceAvailable(ks string) bool {
panic("implement me")
}
Expand Down
88 changes: 88 additions & 0 deletions go/vt/vtgate/engine/mstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package engine

import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

var _ Primitive = (*MStream)(nil)

// MStream is an operator for message streaming from specific keyspace, destination
type MStream struct {
// Keyspace specifies the keyspace to stream messages from
Keyspace *vindexes.Keyspace

// TargetDestination specifies an explicit target destination to stream messages from
TargetDestination key.Destination

// TableName specifies the table on which stream will be executed.
TableName string

noTxNeeded

noInputs
}

// RouteType implements the Primitive interface
func (m *MStream) RouteType() string {
return "MStream"
}

// GetKeyspaceName implements the Primitive interface
func (m *MStream) GetKeyspaceName() string {
return m.Keyspace.Name
}

// GetTableName implements the Primitive interface
func (m *MStream) GetTableName() string {
return m.TableName
}

// Execute implements the Primitive interface
func (m *MStream) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] 'Execute' called for Stream")
}

// StreamExecute implements the Primitive interface
func (m *MStream) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
rss, _, err := vcursor.ResolveDestinations(m.Keyspace.Name, nil, []key.Destination{m.TargetDestination})
if err != nil {
return err
}
return vcursor.MessageStream(rss, m.TableName, callback)
}

// GetFields implements the Primitive interface
func (m *MStream) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] 'GetFields' called for Stream")
}

func (m *MStream) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "MStream",
Keyspace: m.Keyspace,
TargetDestination: m.TargetDestination,

Other: map[string]interface{}{"Table": m.TableName},
}
}
14 changes: 8 additions & 6 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,21 @@ limitations under the License.
package engine

import (
"context"
"encoding/json"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vtgate/vindexes"

"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/vt/sqlparser"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vtgate/vindexes"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)
Expand Down Expand Up @@ -104,6 +102,10 @@ type (

// KeyspaceAvailable returns true when a keyspace is visible from vtgate
KeyspaceAvailable(ks string) bool

MessageStream(rss []*srvtopo.ResolvedShard, tableName string, callback func(*sqltypes.Result) error) error

VStream(rss []*srvtopo.ResolvedShard, filter *binlogdatapb.Filter, gtid string, callback func(evs []*binlogdatapb.VEvent) error) error
}

//SessionActions gives primitives ability to interact with the session state
Expand Down
Loading

0 comments on commit 9ee1e9c

Please sign in to comment.