Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sessionctx,kv,planner: add system variable for fine_grained_shuffle #35256

Merged
merged 44 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d0403ca
sessionctx,kv,planner: add session variable for fine_grained_shuffle
guo-shaoge May 29, 2022
416f62d
Merge branch 'master' of github.com:pingcap/tidb into fine_grained_sh…
guo-shaoge Jun 9, 2022
bf099f3
fix bug
guo-shaoge Jun 10, 2022
a00b4dd
Merge branch 'master' of github.com:pingcap/tidb into fine_grained_sh…
guo-shaoge Jun 13, 2022
9725158
update kvproto dep
guo-shaoge Jun 20, 2022
d705703
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jun 20, 2022
ce73985
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jun 20, 2022
4dd96f0
update DEPS.bzl
guo-shaoge Jun 20, 2022
b38dd77
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tidb int…
guo-shaoge Jun 20, 2022
10fb3b0
fix
guo-shaoge Jun 20, 2022
ba85856
fix
guo-shaoge Jun 20, 2022
b8aafc7
fix sysvar
guo-shaoge Jun 20, 2022
54ca46e
fix log
guo-shaoge Jun 24, 2022
4cb8452
Merge branch 'master' of github.com:pingcap/tidb into fine_grained_sh…
guo-shaoge Jun 24, 2022
bc797f9
enable fine_grained_shuffle by fragment level instead of query level
guo-shaoge Jun 30, 2022
dc8739e
use tidb_max_tiflash_threads as stream count
guo-shaoge Jun 30, 2022
0587ac9
fix fmt
guo-shaoge Jun 30, 2022
fca0b04
trivial fix
guo-shaoge Jun 30, 2022
f75a590
add more test
guo-shaoge Jun 30, 2022
904205a
add case
guo-shaoge Jul 1, 2022
d4b2be4
fix
guo-shaoge Jul 1, 2022
be96955
add case
guo-shaoge Jul 1, 2022
e2ee539
fix basePhysicalPlan.Clone()
guo-shaoge Jul 3, 2022
ddacfa5
add comment
guo-shaoge Jul 3, 2022
fa0907c
add case TestHandleFineGrainedShuffle
guo-shaoge Jul 3, 2022
db7f2ad
update tipb
guo-shaoge Jul 4, 2022
586cabf
Merge branch 'master' of github.com:pingcap/tidb into fine_grained_sh…
guo-shaoge Jul 4, 2022
d734d7c
fix lint
guo-shaoge Jul 4, 2022
7b170ec
update case
guo-shaoge Jul 4, 2022
f502f87
refactor setupFineGrainedShuffle()
guo-shaoge Jul 4, 2022
f758c2f
fix
guo-shaoge Jul 4, 2022
b078691
add case
guo-shaoge Jul 4, 2022
04f0fca
Merge branch 'master' into fine_grained_shuffle
guo-shaoge Jul 4, 2022
3667537
fix lint
guo-shaoge Jul 4, 2022
c587938
Merge branch 'fine_grained_shuffle' of github.com:guo-shaoge/tidb int…
guo-shaoge Jul 4, 2022
e0892b5
also explain stream_count for projection/selection/window/sort
guo-shaoge Jul 4, 2022
a808fbc
update case and fix comment
guo-shaoge Jul 5, 2022
25d298c
update case
guo-shaoge Jul 5, 2022
76ee0ed
fix kvproto; addTarget() -> updateTarget()
guo-shaoge Jul 5, 2022
4ec0629
update DEPS.bzl
guo-shaoge Jul 5, 2022
4e1b3bc
remove extra comma in explain info
guo-shaoge Jul 5, 2022
874c2b3
Merge branch 'master' into fine_grained_shuffle
ti-chi-bot Jul 5, 2022
e0ad55f
Merge branch 'master' into fine_grained_shuffle
ti-chi-bot Jul 5, 2022
fbdca0e
Merge branch 'master' into fine_grained_shuffle
ti-chi-bot Jul 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
if err != nil {
return errors.Trace(err)
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()), zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()),
zap.Bool("EnableFineGrainedShuffle", pf.EnableFineGrainedShuffle),
zap.Uint32("FineGrainedShuffleStreamCount", e.ctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount),
zap.Int64("FineGrainedShuffleBatchSize", e.ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
Expand All @@ -88,6 +93,10 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
StartTs: e.startTS,
State: kv.MppTaskReady,
}
if pf.EnableFineGrainedShuffle {
req.FineGrainedShuffleStreamCount = e.ctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount
req.FineGrainedShuffleBatchSize = e.ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize
}
e.mppReqs = append(e.mppReqs, req)
}
return nil
Expand Down
46 changes: 46 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,3 +1247,49 @@ func TestAggPushDownCountStar(t *testing.T) {

tk.MustQuery("select count(*) from c, o where c.c_id=o.c_id").Check(testkit.Rows("5"))
}

// Window function is not implemneted in unistore, so comment this test for now.
func TestEnableFineGrainedShuffle(t *testing.T) {
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
store, clean := testkit.CreateMockStore(t, withMockTiFlash(2))
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int)")
tk.MustExec("alter table t1 set tiflash replica 2;")
tb := external.GetTableByName(t, tk, "test", "t1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@tidb_allow_mpp = 1")
tk.MustExec("set @@tidb_enforce_mpp = 1")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/TestEnableFineGrainedShuffle", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/TestEnableFineGrainedShuffle"))
}()
// // 1. Can use fine grained shuffle.
// tk.MustExec("select row_number() over w1 from t1 window w1 as (partition by c1 order by c1);")
// // Test two window function.
// tk.MustExec("select row_number() over w1, rank() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (partition by c2);")
// // Limit + Order.
// tk.MustExec("select row_number() over w1, rank() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (partition by c2) order by 1, 2 limit 10;")

// // 2. Cannot use fine grained shuffle.
// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/TestEnableFineGrainedShuffle", "return(false)"))
// // No window function, so disabled.
// tk.MustExec("select * from t1;")
// // No partition key in window function, so disabled.
// tk.MustExec("select row_number() over w1 from t1 window w1 as (order by c1);")
// // No partition by key in w2, so disabled.
// tk.MustExec("select row_number() over w1, row_number() over w2 from t1 window w1 as (partition by c1 order by c1), w2 as (order by c1);")
// // GroupBy key is same with window function partition key, so they are in one fragment.
// // But fine grained shuffle doesn't support group by for now.
// tk.MustExec("select row_number() over w1, count(c2) from t1 group by c1 having c1 > 10 window w1 as (partition by c1 order by c2);")
// // GroupBy key and window function partition key are not same. But group by doesn't fine grained shuffle.
// // But for now fine grained shuffle is Query level, doesn't support Task level.
// tk.MustExec("select row_number() over w1, count(c2) from t1 group by c1 having c1 > 10 window w1 as (partition by c2 order by c2);")
// tk.MustExec("select row_number() over w1, count(c1) from t1 group by c2 having c2 > 10 window w1 as (partition by c1 order by c2);")
// // Join, same as GroupBy.
// tk.MustExec("select row_number() over w1 from t1 a join t1 b on a.c1 = b.c2 window w1 as (partition by a.c1);")
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,5 @@ replace github.com/pingcap/tidb/parser => ./parser

// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible

replace github.com/pingcap/kvproto => /home/guojiangtao/work/kvproto
9 changes: 0 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/E
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down Expand Up @@ -314,7 +313,6 @@ github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -636,10 +634,6 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167 h1:dsMpneacHyuVslSVndgUfJKrXFNG7VPdXip2ulG6glo=
github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -954,7 +948,6 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1263,7 +1256,6 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
Expand Down Expand Up @@ -1332,7 +1324,6 @@ google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20220211171837-173942840c17/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/genproto v0.0.0-20220216160803-4663080d8bc8 h1:divpuJZKgX3Qt7MFDE5v62yu0yQcQbTCD9VJp9leX58=
google.golang.org/genproto v0.0.0-20220216160803-4663080d8bc8/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand Down
10 changes: 6 additions & 4 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ type MPPDispatchRequest struct {
IsRoot bool // root task returns data to tidb directly.
Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed.
// SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary.
SchemaVar int64
StartTs uint64
ID int64 // identify a single task
State MppTaskStates
SchemaVar int64
StartTs uint64
ID int64 // identify a single task
State MppTaskStates
FineGrainedShuffleStreamCount uint32
FineGrainedShuffleBatchSize int64
}

// MPPClient accepts and processes mpp requests.
Expand Down
93 changes: 90 additions & 3 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package core

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -46,7 +48,8 @@ type Fragment struct {

IsRoot bool

singleton bool // indicates if this is a task running on a single node.
singleton bool // indicates if this is a task running on a single node.
EnableFineGrainedShuffle bool
}

type tasksAndFrags struct {
Expand All @@ -63,14 +66,44 @@ type mppTaskGenerator struct {
}

// GenerateRootMPPTasks generate all mpp tasks and return root ones.
func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*Fragment, error) {
func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) (frags []*Fragment, err error) {
g := &mppTaskGenerator{
ctx: ctx,
startTS: startTs,
is: is,
cache: make(map[int]tasksAndFrags),
}
return g.generateMPPTasks(sender)
if frags, err = g.generateMPPTasks(sender); err != nil {
return
}
setupEnableFineGrainedShuffleFlag(frags)
return
}

// 1. Disable if there is any fragment which is disabled(So fine grained shuffle is query level).
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
// 2. Disable if there is only one fragment.
func setupEnableFineGrainedShuffleFlag(frags []*Fragment) {
enable := true
if len(frags) == 1 {
enable = false
} else {
for _, frag := range frags {
if !frag.EnableFineGrainedShuffle {
enable = false
break
}
}
for _, frag := range frags {
frag.EnableFineGrainedShuffle = enable
}
}
failpoint.Inject("testEnableFineGrainedShuffle", func(val failpoint.Value) {
expected := val.(bool)
if expected != enable {
// panic(fmt.Sprintf("fine grained shuffle switch is wrong, expected: %v, got: %v", expected, enable))
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
logutil.BgLogger().Error(fmt.Sprintf("gjt fine grained shuffle switch is wrong, expected: %v, got: %v", expected, enable))
}
})
}

func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragment, error) {
Expand Down Expand Up @@ -143,6 +176,59 @@ func (f *Fragment) init(p PhysicalPlan) error {
return nil
}

// Core logic to decide whether to enable fine grained shuffle.
func checkEnableFineGrainedShuffleForFragment(hasValidWindow bool, hasInvalidWindow bool, hasTableScan bool) bool {
if hasInvalidWindow {
return false
}
if hasValidWindow || hasTableScan {
return true
}
return false
}

func (f *Fragment) initFineGrainedShuffleFlag(s *PhysicalExchangeSender) {
f.EnableFineGrainedShuffle = checkEnableFineGrainedShuffleForFragment(traverseFragmentForFineGrainedShuffle(s))
}

// Valid window for fine grained shuffle:
// 1. has partition key
// 2. its child must be: window, sort or exchange receiver.
func traverseFragmentForFineGrainedShuffle(p PhysicalPlan) (hasValidWindow bool, hasInvalidWindow bool, hasTableScan bool) {
switch x := p.(type) {
case *PhysicalWindow:
if len(x.PartitionBy) <= 0 {
hasInvalidWindow = true
break
}
child := x.Children()[0]
if sort, ok := child.(*PhysicalSort); ok {
child = sort.Children()[0]
}
switch child.(type) {
case *PhysicalWindow:
hasValidWindow = true
_, hasInvalidWindow, hasTableScan = traverseFragmentForFineGrainedShuffle(child)
case *PhysicalExchangeReceiver:
hasValidWindow = true
default:
hasInvalidWindow = true
}
case *PhysicalTableScan:
hasTableScan = true
case *PhysicalExchangeReceiver:
break
default:
for _, child := range p.Children() {
hasValidWindow, hasInvalidWindow, hasTableScan = traverseFragmentForFineGrainedShuffle(child)
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
if !checkEnableFineGrainedShuffleForFragment(hasValidWindow, hasInvalidWindow, hasTableScan) {
break
}
}
}
return
}

// We would remove all the union-all operators by 'untwist'ing and copying the plans above union-all.
// This will make every route from root (ExchangeSender) to leaf nodes (ExchangeReceiver and TableScan)
// a new ioslated tree (and also a fragment) without union all. These trees (fragments then tasks) will
Expand Down Expand Up @@ -215,6 +301,7 @@ func buildFragments(s *PhysicalExchangeSender) ([]*Fragment, error) {
if err != nil {
return nil, errors.Trace(err)
}
f.initFineGrainedShuffleFlag(s)
fragments = append(fragments, f)
}
return fragments, nil
Expand Down
1 change: 1 addition & 0 deletions planner/core/fragment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tipb/go-tipb"
"github.com/stretchr/testify/require"

Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,9 @@ type SessionVars struct {

// MaxAllowedPacket indicates the maximum size of a packet for the MySQL protocol.
MaxAllowedPacket uint64

TiFlashFineGrainedShuffleStreamCount uint32
TiFlashFineGrainedShuffleBatchSize int64
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,18 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleStreamCount, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleStreamCount), Type: TypeInt, MinValue: 0, MaxValue: 1024,
SetSession: func(s *SessionVars, val string) error {
s.TiFlashFineGrainedShuffleStreamCount = uint32(TidbOptInt64(val, DefTiFlashFineGrainedShuffleStreamCount))
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleBatchSize, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleBatchSize), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64,
SetSession: func(s *SessionVars, val string) error {
s.TiFlashFineGrainedShuffleBatchSize = TidbOptInt64(val, DefTiFlashFineGrainedShuffleBatchSize)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
Loading