Skip to content

Commit

Permalink
executor,util: Stat inner/inter zone network traffic for MPP tasks (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
yibin87 authored Jan 21, 2025
1 parent 4e1cf8a commit c34a6b6
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 95 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5906,13 +5906,13 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sha256 = "923efe448355ba420cfbd82e93df2f99b08c3fb36191b1f2cabdd3bf32904852",
strip_prefix = "github.com/pingcap/[email protected]20241105053214-f91fdb81a69e",
sha256 = "87ef3f28a30822c9e2a4966bfb573025ae332ac2a045be1026641e121fccb7e6",
strip_prefix = "github.com/pingcap/[email protected]20241212101007-246f91188357",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241105053214-f91fdb81a69e.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241105053214-f91fdb81a69e.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241105053214-f91fdb81a69e.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241105053214-f91fdb81a69e.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241212101007-246f91188357.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241212101007-246f91188357.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241212101007-246f91188357.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241212101007-246f91188357.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ require (
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20241105053214-f91fdb81a69e
github.com/pingcap/tipb v0.0.0-20241212101007-246f91188357
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.62.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,8 @@ github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TL
github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tipb v0.0.0-20241105053214-f91fdb81a69e h1:7DdrYVwWpYr4o1AyKl8T376B4h2RsMEjkmom8MxQuuM=
github.com/pingcap/tipb v0.0.0-20241105053214-f91fdb81a69e/go.mod h1:zrnYy8vReNODg8G0OiYaX9OK+kpq+rK1jHmvd1DnIWw=
github.com/pingcap/tipb v0.0.0-20241212101007-246f91188357 h1:s58UXyaWMNeaoeuVPZdrkm5Uk7NcODHqICGCUQ3A9s4=
github.com/pingcap/tipb v0.0.0-20241212101007-246f91188357/go.mod h1:zrnYy8vReNODg8G0OiYaX9OK+kpq+rK1jHmvd1DnIWw=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
24 changes: 24 additions & 0 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,8 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
// but we set it again in case we missed some code paths.
sessVars.StmtCtx.SetPlan(a.Plan)
}

a.updateMPPNetworkTraffic()
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
Expand Down Expand Up @@ -1799,6 +1801,28 @@ func GetResultRowsCount(stmtCtx *stmtctx.StatementContext, p base.Plan) int64 {
return runtimeStatsColl.GetPlanActRows(p.ID())
}

func (a *ExecStmt) updateMPPNetworkTraffic() {
sessVars := a.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
runtimeStatsColl := stmtCtx.RuntimeStatsColl
if runtimeStatsColl == nil {
return
}
tiflashNetworkStats := runtimeStatsColl.GetStmtCopRuntimeStats().TiflashNetworkStats
if tiflashNetworkStats == nil {
return
}
var tikvExecDetail *util.ExecDetails
tikvExecDetailRaw := a.GoCtx.Value(util.ExecDetailsKey)
if tikvExecDetailRaw == nil {
tikvExecDetailRaw = &util.ExecDetails{}
a.GoCtx = context.WithValue(a.GoCtx, util.ExecDetailsKey, tikvExecDetailRaw)
}

tikvExecDetail = tikvExecDetailRaw.(*util.ExecDetails)
tiflashNetworkStats.UpdateTiKVExecDetails(tikvExecDetail)
}

// getFlatPlan generates a FlatPhysicalPlan from the plan stored in stmtCtx.plan,
// then stores it in stmtCtx.flatPlan.
func getFlatPlan(stmtCtx *stmtctx.StatementContext) *plannercore.FlatPhysicalPlan {
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/internal/mpp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
visibility = ["//pkg/executor:__subpackages__"],
deps = [
"//pkg/config",
"//pkg/ddl/placement",
"//pkg/distsql",
"//pkg/executor/internal/builder",
"//pkg/executor/internal/util",
Expand All @@ -25,6 +26,7 @@ go_library(
"//pkg/store/copr",
"//pkg/store/driver/backoff",
"//pkg/store/driver/error",
"//pkg/store/helper",
"//pkg/util",
"//pkg/util/execdetails",
"//pkg/util/logutil",
Expand All @@ -34,6 +36,7 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_zap//:zap",
],
Expand Down
149 changes: 147 additions & 2 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/distsql"
"github.com/pingcap/tidb/pkg/executor/internal/builder"
"github.com/pingcap/tidb/pkg/executor/internal/util"
Expand All @@ -40,11 +41,13 @@ import (
"github.com/pingcap/tidb/pkg/store/copr"
"github.com/pingcap/tidb/pkg/store/driver/backoff"
derr "github.com/pingcap/tidb/pkg/store/driver/error"
"github.com/pingcap/tidb/pkg/store/helper"
util2 "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikv"
clientutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -189,7 +192,7 @@ func NewLocalMPPCoordinator(ctx context.Context, sctx sessionctx.Context, is inf
return coord
}

func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment) error {
func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment, allTiFlashZoneInfo map[string]string) error {
dagReq, err := builder.ConstructDAGReq(c.sessionCtx, []base.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash)
if err != nil {
return errors.Trace(err)
Expand All @@ -202,6 +205,8 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment) err
} else {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
}
zoneHelper := taskZoneInfoHelper{}
zoneHelper.init(allTiFlashZoneInfo)
for _, mppTask := range pf.ExchangeSender.Tasks {
if mppTask.PartitionTableIDs != nil {
err = util.UpdateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs)
Expand All @@ -217,6 +222,9 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment) err
if err != nil {
return err
}
zoneHelper.isRoot = pf.IsRoot
zoneHelper.currentTaskZone = zoneHelper.allTiFlashZoneInfo[mppTask.Meta.GetAddress()]
zoneHelper.fillSameZoneFlagForExchange(dagReq.RootExecutor)
pbData, err := dagReq.Marshal()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -343,6 +351,127 @@ func (c *localMppCoordinator) fixTaskForCTEStorageAndReader(exec *tipb.Executor,
return nil
}

// taskZoneInfoHelper used to help reset exchange executor's same zone flags
type taskZoneInfoHelper struct {
allTiFlashZoneInfo map[string]string
// exchangeZoneInfo is used to cache one mpp task's zone info:
// key is executor id, value is zone info array
// for ExchangeSender, it's target tiflash nodes' zone info; for ExchangeReceiver, it's source tiflash nodes' zone info
exchangeZoneInfo map[string][]string
tidbZone string
currentTaskZone string
isRoot bool
}

func (h *taskZoneInfoHelper) init(allTiFlashZoneInfo map[string]string) {
h.tidbZone = config.GetGlobalConfig().Labels[placement.DCLabelKey]
h.allTiFlashZoneInfo = allTiFlashZoneInfo
// initial capacity to 2, for one exchange sender and one exchange receiver
h.exchangeZoneInfo = make(map[string][]string, 2)
}

func (h *taskZoneInfoHelper) tryQuickFillWithUncertainZones(exec *tipb.Executor, slots int, sameZoneFlags []bool) (bool, []bool) {
if exec.ExecutorId == nil || len(h.currentTaskZone) == 0 {
for i := 0; i < slots; i++ {
sameZoneFlags = append(sameZoneFlags, true)
}
return true, sameZoneFlags
}
if h.isRoot && exec.Tp == tipb.ExecType_TypeExchangeSender {
sameZoneFlags = append(sameZoneFlags, len(h.tidbZone) == 0 || h.currentTaskZone == h.tidbZone)
return true, sameZoneFlags
}

// For CTE exchange nodes, data is passed locally, set all to true
if (exec.Tp == tipb.ExecType_TypeExchangeSender && len(exec.ExchangeSender.UpstreamCteTaskMeta) != 0) ||
(exec.Tp == tipb.ExecType_TypeExchangeReceiver && len(exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta) != 0) {
for i := 0; i < slots; i++ {
sameZoneFlags = append(sameZoneFlags, true)
}
return true, sameZoneFlags
}

return false, sameZoneFlags
}

func (h *taskZoneInfoHelper) collectExchangeZoneInfos(encodedTaskMeta [][]byte, slots int) []string {
zoneInfos := make([]string, 0, slots)
for _, taskBytes := range encodedTaskMeta {
taskMeta := &mpp.TaskMeta{}
err := taskMeta.Unmarshal(taskBytes)
if err != nil {
zoneInfos = append(zoneInfos, "")
continue
}
zoneInfos = append(zoneInfos, h.allTiFlashZoneInfo[taskMeta.GetAddress()])
}
return zoneInfos
}

func (h *taskZoneInfoHelper) inferSameZoneFlag(exec *tipb.Executor, encodedTaskMeta [][]byte) []bool {
slots := len(encodedTaskMeta)
sameZoneFlags := make([]bool, 0, slots)
filled := false
if filled, sameZoneFlags = h.tryQuickFillWithUncertainZones(exec, slots, sameZoneFlags); filled {
return sameZoneFlags
}
zoneInfos, exist := h.exchangeZoneInfo[*exec.ExecutorId]
if !exist {
zoneInfos = h.collectExchangeZoneInfos(encodedTaskMeta, slots)
h.exchangeZoneInfo[*exec.ExecutorId] = zoneInfos
}

if len(zoneInfos) != slots {
// This branch is for safety purpose, not expected
for i := 0; i < slots; i++ {
sameZoneFlags = append(sameZoneFlags, true)
}
return sameZoneFlags
}

for i := 0; i < slots; i++ {
sameZoneFlags = append(sameZoneFlags, len(zoneInfos[i]) == 0 || h.currentTaskZone == zoneInfos[i])
}
return sameZoneFlags
}

func (h *taskZoneInfoHelper) fillSameZoneFlagForExchange(exec *tipb.Executor) {
children := make([]*tipb.Executor, 0, 2)
switch exec.Tp {
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypePartitionTableScan, tipb.ExecType_TypeIndexScan:
case tipb.ExecType_TypeSelection:
children = append(children, exec.Selection.Child)
case tipb.ExecType_TypeAggregation, tipb.ExecType_TypeStreamAgg:
children = append(children, exec.Aggregation.Child)
case tipb.ExecType_TypeTopN:
children = append(children, exec.TopN.Child)
case tipb.ExecType_TypeLimit:
children = append(children, exec.Limit.Child)
case tipb.ExecType_TypeExchangeSender:
children = append(children, exec.ExchangeSender.Child)
exec.ExchangeSender.SameZoneFlag = h.inferSameZoneFlag(exec, exec.ExchangeSender.EncodedTaskMeta)
case tipb.ExecType_TypeExchangeReceiver:
exec.ExchangeReceiver.SameZoneFlag = h.inferSameZoneFlag(exec, exec.ExchangeReceiver.EncodedTaskMeta)
case tipb.ExecType_TypeJoin:
children = append(children, exec.Join.Children...)
case tipb.ExecType_TypeProjection:
children = append(children, exec.Projection.Child)
case tipb.ExecType_TypeWindow:
children = append(children, exec.Window.Child)
case tipb.ExecType_TypeSort:
children = append(children, exec.Sort.Child)
case tipb.ExecType_TypeExpand:
children = append(children, exec.Expand.Child)
case tipb.ExecType_TypeExpand2:
children = append(children, exec.Expand2.Child)
default:
logutil.BgLogger().Warn(fmt.Sprintf("unknown new tipb protocol %d", exec.Tp))
}
for _, child := range children {
h.fillSameZoneFlagForExchange(child)
}
}

func getActualPhysicalPlan(plan base.Plan) base.PhysicalPlan {
if plan == nil {
return nil
Expand Down Expand Up @@ -788,8 +917,24 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke
}
c.nodeCnt = len(nodeInfo)

var allTiFlashZoneInfo map[string]string
if c.sessionCtx.GetStore() == nil {
allTiFlashZoneInfo = make(map[string]string)
} else if tikvStore, ok := c.sessionCtx.GetStore().(helper.Storage); ok {
cache := tikvStore.GetRegionCache()
allTiFlashStores := cache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
allTiFlashZoneInfo = make(map[string]string, len(allTiFlashStores))
for _, tiflashStore := range allTiFlashStores {
tiflashStoreAddr := tiflashStore.GetAddr()
if tiflashZone, isSet := tiflashStore.GetLabelValue(placement.DCLabelKey); isSet {
allTiFlashZoneInfo[tiflashStoreAddr] = tiflashZone
}
}
} else {
allTiFlashZoneInfo = make(map[string]string)
}
for _, frag := range frags {
err = c.appendMPPDispatchReq(frag)
err = c.appendMPPDispatchReq(frag, allTiFlashZoneInfo)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
81 changes: 81 additions & 0 deletions pkg/executor/internal/mpp/local_mpp_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,84 @@ func TestNeedReportExecutionSummary(t *testing.T) {
limitTIDB2.SetChildren(join)
require.True(t, needReportExecutionSummary(limitTIDB2, 10, false))
}

func mockTaskZoneInfoHelper(isRoot bool, taskZone string, tidbZone string, storeZoneMpp map[string]string, exchangeZoneInfo map[string][]string) taskZoneInfoHelper {
helper := taskZoneInfoHelper{
tidbZone: tidbZone,
currentTaskZone: taskZone,
isRoot: isRoot,
allTiFlashZoneInfo: storeZoneMpp,
exchangeZoneInfo: exchangeZoneInfo,
}
return helper
}

func TestZoneHelperTryQuickFill(t *testing.T) {
slots := 3
allTiflashZoneInfo := make(map[string]string, slots)
exchangeZoneInfo := make(map[string][]string, 2)
helper := mockTaskZoneInfoHelper(false, "", "east", allTiflashZoneInfo, exchangeZoneInfo)
exchangeSenderID := "ExchangeSender_1"
sender := &tipb.Executor{
ExecutorId: &exchangeSenderID,
Tp: tipb.ExecType_TypeExchangeSender,
ExchangeSender: &tipb.ExchangeSender{
UpstreamCteTaskMeta: nil,
},
}
sameZoneFlags := make([]bool, 0, slots)
quickFill := false
// When task zone is empty, then the function returns true, and all sameZoneFlags are true
quickFill, sameZoneFlags = helper.tryQuickFillWithUncertainZones(sender, slots, sameZoneFlags)
require.True(t, quickFill)
require.Equal(t, slots, len(sameZoneFlags))
for i := 0; i < slots; i++ {
require.True(t, sameZoneFlags[i])
}

// When task is root task, and executor is exchangeSender then the function compares tidbZone with currentTaskZone
helper.isRoot = true
helper.currentTaskZone = "west"
slots = 1
sameZoneFlags = make([]bool, 0, slots)
quickFill, sameZoneFlags = helper.tryQuickFillWithUncertainZones(sender, slots, sameZoneFlags)
require.True(t, quickFill)
require.Equal(t, slots, len(sameZoneFlags))
for i := 0; i < slots; i++ {
require.False(t, sameZoneFlags[i])
}

helper.currentTaskZone = "east"
sameZoneFlags = make([]bool, 0, slots)
quickFill, sameZoneFlags = helper.tryQuickFillWithUncertainZones(sender, slots, sameZoneFlags)
require.True(t, quickFill)
require.Equal(t, slots, len(sameZoneFlags))
for i := 0; i < slots; i++ {
require.True(t, sameZoneFlags[i])
}

// When task is neither root exchange sender nor current task zone is empty, return false, and empty sameZoneFlags
helper.isRoot = false
helper.currentTaskZone = "west"
slots = 3
sameZoneFlags = make([]bool, 0, slots)
quickFill, sameZoneFlags = helper.tryQuickFillWithUncertainZones(sender, slots, sameZoneFlags)
require.False(t, quickFill)
require.Equal(t, 0, len(sameZoneFlags))

helper.isRoot = true
helper.currentTaskZone = "west"
slots = 3
sameZoneFlags = make([]bool, 0, slots)
exchangeReceiverID := "ExchangeReceiver_2"
receiver := &tipb.Executor{
ExecutorId: &exchangeReceiverID,
Tp: tipb.ExecType_TypeExchangeReceiver,
ExchangeReceiver: &tipb.ExchangeReceiver{
OriginalCtePrdocuerTaskMeta: nil,
},
}
quickFill, sameZoneFlags = helper.tryQuickFillWithUncertainZones(receiver, slots, sameZoneFlags)
require.False(t, quickFill)
require.Equal(t, 0, len(sameZoneFlags))
}
Loading

0 comments on commit c34a6b6

Please sign in to comment.