From d0a9458b6ec145b84abe7628e5f41875d304cf8a Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 1 Sep 2020 15:33:49 +0200 Subject: [PATCH 1/3] make offset work in OLAP mode Signed-off-by: Andres Taylor --- go/test/endtoend/vtgate/misc_test.go | 12 ++++++++ go/vt/vtgate/engine/limit.go | 32 ++++++++++++++++----- go/vt/vtgate/engine/limit_test.go | 43 ++++++++++++++++++++++++++++ go/vt/vtgate/planbuilder/limit.go | 15 +++------- 4 files changed, 84 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 6d5f1593447..bf76b42092b 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -319,6 +319,18 @@ func TestInformationSchemaQuery(t *testing.T) { assert.Equal(t, "vt_ks", qr.Rows[0][0].ToString()) } +func TestOffsetAndLimitWithOLAP(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + exec(t, conn, "insert into t1(id1, id2) values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)") + assertMatches(t, conn, "select id1 from t1 order by id1 limit 3 offset 2", "[[INT64(3)] [INT64(4)] [INT64(5)]]") + exec(t, conn, "set workload='olap'") + assertMatches(t, conn, "select id1 from t1 order by id1 limit 3 offset 2", "[[INT64(3)] [INT64(4)] [INT64(5)]]") +} + func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { t.Helper() qr := exec(t, conn, query) diff --git a/go/vt/vtgate/engine/limit.go b/go/vt/vtgate/engine/limit.go index c43b6a4b28f..5cfe8d7dbc9 100644 --- a/go/vt/vtgate/engine/limit.go +++ b/go/vt/vtgate/engine/limit.go @@ -94,8 +94,9 @@ func (l *Limit) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.Bind if err != nil { return err } - if !l.Offset.IsNull() { - return fmt.Errorf("offset not supported for stream execute queries") + offset, err := l.fetchOffset(bindVars) + if err != nil { + return err } bindVars["__upper_limit"] = sqltypes.Int64BindVariable(int64(count)) @@ -106,19 +107,31 @@ func (l *Limit) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.Bind return err } } - if len(qr.Rows) == 0 { + inputSize := len(qr.Rows) + if inputSize == 0 { return nil } + // we've still not seen all rows we need to see before we can return anything to the client + if offset > 0 { + if inputSize <= offset { + // not enough to return anything yet + offset -= inputSize + return nil + } + qr.Rows = qr.Rows[offset:] + offset = 0 + } + if count == 0 { - // Unreachable: this is just a failsafe. return io.EOF } // reduce count till 0. result := &sqltypes.Result{Rows: qr.Rows} - if count > len(result.Rows) { - count -= len(result.Rows) + resultSize := len(result.Rows) + if count > resultSize { + count -= resultSize return callback(result) } result.Rows = result.Rows[:count] @@ -140,7 +153,7 @@ func (l *Limit) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.Bind return nil } -// GetFields satisfies the Primtive interface. +// GetFields implements the Primitive interface. func (l *Limit) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { return l.Input.GetFields(vcursor, bindVars) } @@ -150,11 +163,16 @@ func (l *Limit) Inputs() []Primitive { return []Primitive{l.Input} } +//NeedsTransaction implements the Primitive interface. func (l *Limit) NeedsTransaction() bool { return l.Input.NeedsTransaction() } func (l *Limit) fetchCount(bindVars map[string]*querypb.BindVariable) (int, error) { + if l.Count.IsNull() { + return 0, nil + } + resolved, err := l.Count.ResolveValue(bindVars) if err != nil { return 0, err diff --git a/go/vt/vtgate/engine/limit_test.go b/go/vt/vtgate/engine/limit_test.go index e0dd4ddadaf..d7eb5077e71 100644 --- a/go/vt/vtgate/engine/limit_test.go +++ b/go/vt/vtgate/engine/limit_test.go @@ -432,6 +432,49 @@ func TestLimitStreamExecute(t *testing.T) { } } +func TestOffsetStreamExecute(t *testing.T) { + bindVars := make(map[string]*querypb.BindVariable) + fields := sqltypes.MakeTestFields( + "col1|col2", + "int64|varchar", + ) + inputResult := sqltypes.MakeTestResult( + fields, + "a|1", + "b|2", + "c|3", + "d|4", + "e|5", + "f|6", + ) + fp := &fakePrimitive{ + results: []*sqltypes.Result{inputResult}, + } + + l := &Limit{ + Offset: int64PlanValue(2), + Count: int64PlanValue(3), + Input: fp, + } + + var results []*sqltypes.Result + err := l.StreamExecute(nil, bindVars, false, func(qr *sqltypes.Result) error { + results = append(results, qr) + return nil + }) + require.NoError(t, err) + wantResults := sqltypes.MakeTestStreamingResults( + fields, + "c|3", + "d|4", + "---", + "e|5", + ) + if !reflect.DeepEqual(results, wantResults) { + t.Errorf("l.StreamExecute:\n%s, want\n%s", sqltypes.PrintResults(results), sqltypes.PrintResults(wantResults)) + } +} + func TestLimitGetFields(t *testing.T) { result := sqltypes.MakeTestResult( sqltypes.MakeTestFields( diff --git a/go/vt/vtgate/planbuilder/limit.go b/go/vt/vtgate/planbuilder/limit.go index 74fdc754188..796850b3b18 100644 --- a/go/vt/vtgate/planbuilder/limit.go +++ b/go/vt/vtgate/planbuilder/limit.go @@ -18,11 +18,9 @@ package planbuilder import ( "errors" - "fmt" - - "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" ) @@ -93,17 +91,12 @@ func (l *limit) SetLimit(limit *sqlparser.Limit) error { } l.elimit.Count = pv - switch offset := limit.Offset.(type) { - case *sqlparser.Literal: - pv, err = sqlparser.NewPlanValue(offset) + if limit.Offset != nil { + pv, err = sqlparser.NewPlanValue(limit.Offset) if err != nil { - return err + return vterrors.Wrap(err, "unexpected expression in OFFSET") } l.elimit.Offset = pv - case nil: - // NOOP - default: - return fmt.Errorf("unexpected expression in LIMIT: %v", sqlparser.String(limit)) } l.input.SetUpperLimit(sqlparser.NewArgument([]byte(":__upper_limit"))) From 2869b3e953ffee9243dbf1f1837b355c06e9fca8 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 1 Sep 2020 22:13:59 +0530 Subject: [PATCH 2/3] pass limit+offset to each shard to have enough records to work on offset inmemory Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/limit.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/engine/limit.go b/go/vt/vtgate/engine/limit.go index 5cfe8d7dbc9..e712c182dc5 100644 --- a/go/vt/vtgate/engine/limit.go +++ b/go/vt/vtgate/engine/limit.go @@ -99,7 +99,9 @@ func (l *Limit) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.Bind return err } - bindVars["__upper_limit"] = sqltypes.Int64BindVariable(int64(count)) + // When offset is present, we hijack the limit value so we can calculate + // the offset in memory from the result of the scatter query with count + offset. + bindVars["__upper_limit"] = sqltypes.Int64BindVariable(int64(count + offset)) err = l.Input.StreamExecute(vcursor, bindVars, wantfields, func(qr *sqltypes.Result) error { if len(qr.Fields) != 0 { From a813ef2605e4425fbf14bba17b1442968f6e5e8c Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 2 Sep 2020 13:45:17 +0530 Subject: [PATCH 3/3] executor unit test with limit offset Signed-off-by: Harshit Gangal --- go/vt/vtgate/executor_select_test.go | 72 ++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index cbdb1362754..a0ffb4533ed 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -221,6 +221,78 @@ func TestStreamBuffering(t *testing.T) { } } +func TestStreamLimitOffset(t *testing.T) { + executor, sbc1, sbc2, _ := createLegacyExecutorEnv() + + // This test is similar to TestStreamUnsharded except that it returns a Result > 10 bytes, + // such that the splitting of the Result into multiple Result responses gets tested. + sbc1.SetResults([]*sqltypes.Result{{ + Fields: []*querypb.Field{ + {Name: "id", Type: sqltypes.Int32}, + {Name: "textcol", Type: sqltypes.VarChar}, + }, + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt32(1), + sqltypes.NewVarChar("1234"), + }, { + sqltypes.NewInt32(4), + sqltypes.NewVarChar("4567"), + }}, + }}) + + sbc2.SetResults([]*sqltypes.Result{{ + Fields: []*querypb.Field{ + {Name: "id", Type: sqltypes.Int32}, + {Name: "textcol", Type: sqltypes.VarChar}, + }, + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt32(2), + sqltypes.NewVarChar("2345"), + }}, + }}) + + results := make(chan *sqltypes.Result, 10) + err := executor.StreamExecute( + context.Background(), + "TestStreamLimitOffset", + NewSafeSession(masterSession), + "select id, textcol from user order by id limit 2 offset 2", + nil, + querypb.Target{ + TabletType: topodatapb.TabletType_MASTER, + }, + func(qr *sqltypes.Result) error { + results <- qr + return nil + }, + ) + close(results) + require.NoError(t, err) + wantResult := &sqltypes.Result{ + Fields: []*querypb.Field{ + {Name: "id", Type: sqltypes.Int32}, + {Name: "textcol", Type: sqltypes.VarChar}, + }, + + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt32(1), + sqltypes.NewVarChar("1234"), + }, { + sqltypes.NewInt32(1), + sqltypes.NewVarChar("foo"), + }}, + } + var gotResults []*sqltypes.Result + for r := range results { + gotResults = append(gotResults, r) + } + res := gotResults[0] + for i := 1; i < len(gotResults); i++ { + res.Rows = append(res.Rows, gotResults[i].Rows...) + } + utils.MustMatch(t, wantResult, res, "") +} + func TestSelectLastInsertId(t *testing.T) { masterSession.LastInsertId = 52 executor, _, _, _ := createLegacyExecutorEnv()