Skip to content

Commit

Permalink
topsql: compress plan instead of dropping it (#35973)
Browse files Browse the repository at this point in the history
ref #35964
  • Loading branch information
zhongzc authored Jul 6, 2022
1 parent 55aea27 commit ec9f201
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 56 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2523,8 +2523,8 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sum = "h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA=",
version = "v0.0.0-20220704030114-0f4f873beca8",
sum = "h1:XaTE4ZhQbQtQZtAVzlZh/Pf6SjFfMSTe1ia2nGcl36Y=",
version = "v0.0.0-20220706024432-7be3cc83a7d5",
)
go_repository(
name = "com_github_pkg_browser",
Expand Down
1 change: 1 addition & 0 deletions executor/seqtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_test(
"//ddl/util",
"//errno",
"//executor",
"//infoschema",
"//kv",
"//meta/autoid",
"//metrics",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ require (
github.com/pingcap/log v1.1.0
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8
github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.32.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops=
github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8 h1:oYn6UiUSnVlMBr4rLOweNWtdAon5wCLnLGDSFf/8kMA=
github.com/pingcap/tipb v0.0.0-20220704030114-0f4f873beca8/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5 h1:XaTE4ZhQbQtQZtAVzlZh/Pf6SjFfMSTe1ia2nGcl36Y=
github.com/pingcap/tipb v0.0.0-20220706024432-7be3cc83a7d5/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion meta/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ go_library(
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -33,6 +32,7 @@ go_test(
embed = [":meta"],
flaky = True,
deps = [
"//ddl",
"//kv",
"//parser/model",
"//store/mockstore",
Expand Down
1 change: 1 addition & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ go_test(
"//domain",
"//errno",
"//executor",
"//infoschema",
"//kv",
"//meta",
"//parser/ast",
Expand Down
6 changes: 5 additions & 1 deletion util/topsql/collector/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ func (c *TopSQLCollector) RegisterSQL(sqlDigest []byte, normalizedSQL string, is
}

// RegisterPlan uses for testing.
func (c *TopSQLCollector) RegisterPlan(planDigest []byte, normalizedPlan string) {
func (c *TopSQLCollector) RegisterPlan(planDigest []byte, normalizedPlan string, isLarge bool) {
if isLarge {
return
}

digestStr := string(hack.String(planDigest))
c.Lock()
_, ok := c.planMap[digestStr]
Expand Down
1 change: 1 addition & 0 deletions util/topsql/reporter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//config",
"//metrics",
"//util",
"//util/hack",
"//util/logutil",
"//util/topsql/collector",
"//util/topsql/state",
Expand Down
43 changes: 33 additions & 10 deletions util/topsql/reporter/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"sync/atomic"

"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/topsql/collector"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
Expand Down Expand Up @@ -594,6 +595,13 @@ type sqlMeta struct {
isInternal bool
}

// planMeta contains a binaryNormalizedPlan and a bool field isLarge to indicate
// whether that binaryNormalizedPlan is too large to decode quickly
type planMeta struct {
binaryNormalizedPlan string
isLarge bool
}

// normalizedSQLMap is a wrapped map used to register normalizedSQL.
type normalizedSQLMap struct {
data atomic.Value // *sync.Map
Expand Down Expand Up @@ -654,6 +662,10 @@ func (m *normalizedSQLMap) toProto() []tipb.SQLMeta {
// normalizedPlanMap to protobuf representation.
type planBinaryDecodeFunc func(string) (string, error)

// planBinaryCompressFunc is used to compress large normalized plan
// into encoded format
type planBinaryCompressFunc func([]byte) string

// normalizedSQLMap is a wrapped map used to register normalizedPlan.
type normalizedPlanMap struct {
data atomic.Value // *sync.Map
Expand All @@ -668,13 +680,16 @@ func newNormalizedPlanMap() *normalizedPlanMap {

// register saves the relationship between planDigest and normalizedPlan.
// If the internal map size exceeds the limit, the relationship will be discarded.
func (m *normalizedPlanMap) register(planDigest []byte, normalizedPlan string) {
func (m *normalizedPlanMap) register(planDigest []byte, normalizedPlan string, isLarge bool) {
if m.length.Load() >= topsqlstate.GlobalState.MaxCollect.Load() {
ignoreExceedPlanCounter.Inc()
return
}
data := m.data.Load().(*sync.Map)
_, loaded := data.LoadOrStore(string(planDigest), normalizedPlan)
_, loaded := data.LoadOrStore(string(planDigest), planMeta{
binaryNormalizedPlan: normalizedPlan,
isLarge: isLarge,
})
if !loaded {
m.length.Add(1)
}
Expand All @@ -693,18 +708,26 @@ func (m *normalizedPlanMap) take() *normalizedPlanMap {
}

// toProto converts the normalizedPlanMap to the corresponding protobuf representation.
func (m *normalizedPlanMap) toProto(decodePlan planBinaryDecodeFunc) []tipb.PlanMeta {
func (m *normalizedPlanMap) toProto(decodePlan planBinaryDecodeFunc, compressPlan planBinaryCompressFunc) []tipb.PlanMeta {
metas := make([]tipb.PlanMeta, 0, m.length.Load())
m.data.Load().(*sync.Map).Range(func(k, v interface{}) bool {
planDecoded, errDecode := decodePlan(v.(string))
if errDecode != nil {
logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode))
originalMeta := v.(planMeta)
protoMeta := tipb.PlanMeta{
PlanDigest: hack.Slice(k.(string)),
}

var err error
if originalMeta.isLarge {
protoMeta.EncodedNormalizedPlan = compressPlan(hack.Slice(originalMeta.binaryNormalizedPlan))
} else {
protoMeta.NormalizedPlan, err = decodePlan(originalMeta.binaryNormalizedPlan)
}
if err != nil {
logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(err))
return true
}
metas = append(metas, tipb.PlanMeta{
PlanDigest: []byte(k.(string)),
NormalizedPlan: planDecoded,
})

metas = append(metas, protoMeta)
return true
})
return metas
Expand Down
42 changes: 25 additions & 17 deletions util/topsql/reporter/datamodel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,26 +400,32 @@ func Test_normalizedSQLMap_toProto(t *testing.T) {
func Test_normalizedPlanMap_register(t *testing.T) {
topsqlstate.GlobalState.MaxCollect.Store(2)
m := newNormalizedPlanMap()
m.register([]byte("PLAN-1"), "PLAN-1")
m.register([]byte("PLAN-2"), "PLAN-2")
m.register([]byte("PLAN-3"), "PLAN-3")
m.register([]byte("PLAN-1"), "PLAN-1", false)
m.register([]byte("PLAN-2"), "PLAN-2", true)
m.register([]byte("PLAN-3"), "PLAN-3", false)
require.Equal(t, int64(2), m.length.Load())
v, ok := m.data.Load().(*sync.Map).Load("PLAN-1")
require.True(t, ok)
require.Equal(t, "PLAN-1", v.(string))
require.Equal(t, planMeta{
binaryNormalizedPlan: "PLAN-1",
isLarge: false,
}, v.(planMeta))
v, ok = m.data.Load().(*sync.Map).Load("PLAN-2")
require.True(t, ok)
require.Equal(t, "PLAN-2", v.(string))
require.Equal(t, planMeta{
binaryNormalizedPlan: "PLAN-2",
isLarge: true,
}, v.(planMeta))
_, ok = m.data.Load().(*sync.Map).Load("PLAN-3")
require.False(t, ok)
}

func Test_normalizedPlanMap_take(t *testing.T) {
topsqlstate.GlobalState.MaxCollect.Store(999)
m1 := newNormalizedPlanMap()
m1.register([]byte("PLAN-1"), "PLAN-1")
m1.register([]byte("PLAN-2"), "PLAN-2")
m1.register([]byte("PLAN-3"), "PLAN-3")
m1.register([]byte("PLAN-1"), "PLAN-1", false)
m1.register([]byte("PLAN-2"), "PLAN-2", false)
m1.register([]byte("PLAN-3"), "PLAN-3", false)
m2 := m1.take()
require.Equal(t, int64(0), m1.length.Load())
require.Equal(t, int64(3), m2.length.Load())
Expand All @@ -442,26 +448,28 @@ func Test_normalizedPlanMap_take(t *testing.T) {
func Test_normalizedPlanMap_toProto(t *testing.T) {
topsqlstate.GlobalState.MaxCollect.Store(999)
m := newNormalizedPlanMap()
m.register([]byte("PLAN-1"), "PLAN-1")
m.register([]byte("PLAN-2"), "PLAN-2")
m.register([]byte("PLAN-3"), "PLAN-3")
pb := m.toProto(func(s string) (string, error) { return s, nil })
m.register([]byte("PLAN-1"), "PLAN-1", false)
m.register([]byte("PLAN-2"), "PLAN-2", true)
m.register([]byte("PLAN-3"), "PLAN-3", false)
pb := m.toProto(
func(s string) (string, error) { return "[decoded] " + s, nil },
func(s []byte) string { return "[encoded] " + string(s) })
require.Len(t, pb, 3)
hash := map[string]tipb.PlanMeta{}
for _, meta := range pb {
hash[meta.NormalizedPlan] = meta
hash[string(meta.PlanDigest)] = meta
}
require.Equal(t, tipb.PlanMeta{
PlanDigest: []byte("PLAN-1"),
NormalizedPlan: "PLAN-1",
NormalizedPlan: "[decoded] PLAN-1",
}, hash["PLAN-1"])
require.Equal(t, tipb.PlanMeta{
PlanDigest: []byte("PLAN-2"),
NormalizedPlan: "PLAN-2",
PlanDigest: []byte("PLAN-2"),
EncodedNormalizedPlan: "[encoded] PLAN-2",
}, hash["PLAN-2"])
require.Equal(t, tipb.PlanMeta{
PlanDigest: []byte("PLAN-3"),
NormalizedPlan: "PLAN-3",
NormalizedPlan: "[decoded] PLAN-3",
}, hash["PLAN-3"])
}

Expand Down
15 changes: 10 additions & 5 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type TopSQLReporter interface {
RegisterSQL(sqlDigest []byte, normalizedSQL string, isInternal bool)

// RegisterPlan like RegisterSQL, but for normalized plan strings.
RegisterPlan(planDigest []byte, normalizedPlan string)
// isLarge indicates the size of normalizedPlan is big.
RegisterPlan(planDigest []byte, normalizedPlan string, isLarge bool)

// Close uses to close and release the reporter resource.
Close()
Expand Down Expand Up @@ -80,12 +81,15 @@ type RemoteTopSQLReporter struct {

// calling decodePlan this can take a while, so should not block critical paths.
decodePlan planBinaryDecodeFunc

// Instead of dropping large plans, we compress it into encoded format and report
compressPlan planBinaryCompressFunc
}

// NewRemoteTopSQLReporter creates a new RemoteTopSQLReporter.
//
// decodePlan is a decoding function which will be called asynchronously to decode the plan binary to string.
func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc, compressPlan planBinaryCompressFunc) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
DefaultDataSinkRegisterer: NewDefaultDataSinkRegisterer(ctx),
Expand All @@ -99,6 +103,7 @@ func NewRemoteTopSQLReporter(decodePlan planBinaryDecodeFunc) *RemoteTopSQLRepor
normalizedPlanMap: newNormalizedPlanMap(),
stmtStatsBuffer: map[uint64]stmtstats.StatementStatsMap{},
decodePlan: decodePlan,
compressPlan: compressPlan,
}
tsr.sqlCPUCollector = collector.NewSQLCPUCollector(tsr)
return tsr
Expand Down Expand Up @@ -153,8 +158,8 @@ func (tsr *RemoteTopSQLReporter) RegisterSQL(sqlDigest []byte, normalizedSQL str
// RegisterPlan implements TopSQLReporter.
//
// This function is thread-safe and efficient.
func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedPlan string) {
tsr.normalizedPlanMap.register(planDigest, normalizedPlan)
func (tsr *RemoteTopSQLReporter) RegisterPlan(planDigest []byte, normalizedPlan string, isLarge bool) {
tsr.normalizedPlanMap.register(planDigest, normalizedPlan, isLarge)
}

// Close implements TopSQLReporter.
Expand Down Expand Up @@ -270,7 +275,7 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
tsr.doReport(&ReportData{
DataRecords: rs.toProto(),
SQLMetas: data.normalizedSQLMap.toProto(),
PlanMetas: data.normalizedPlanMap.toProto(tsr.decodePlan),
PlanMetas: data.normalizedPlanMap.toProto(tsr.decodePlan, tsr.compressPlan),
})
case <-tsr.ctx.Done():
return
Expand Down
18 changes: 11 additions & 7 deletions util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func populateCache(tsr *RemoteTopSQLReporter, begin, end int, timestamp uint64)
for i := begin; i < end; i++ {
key := []byte("planDigest" + strconv.Itoa(i+1))
value := "planNormalized" + strconv.Itoa(i+1)
tsr.RegisterPlan(key, value)
tsr.RegisterPlan(key, value, false)
}
// collect
var records []collector.SQLCPUTimeRecord
Expand All @@ -63,14 +63,18 @@ func reportCache(tsr *RemoteTopSQLReporter) {
tsr.doReport(&ReportData{
DataRecords: tsr.collecting.take().getReportRecords().toProto(),
SQLMetas: tsr.normalizedSQLMap.take().toProto(),
PlanMetas: tsr.normalizedPlanMap.take().toProto(tsr.decodePlan),
PlanMetas: tsr.normalizedPlanMap.take().toProto(tsr.decodePlan, tsr.compressPlan),
})
}

func mockPlanBinaryDecoderFunc(plan string) (string, error) {
return plan, nil
}

func mockPlanBinaryCompressFunc(plan []byte) string {
return string(plan)
}

type mockDataSink struct {
ch chan *ReportData
}
Expand All @@ -94,7 +98,7 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int) (*RemoteTopSQLRep
topsqlstate.GlobalState.MaxCollect.Store(10000)
topsqlstate.GlobalState.ReportIntervalSeconds.Store(int64(interval))
topsqlstate.EnableTopSQL()
ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)
ts := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc)
ds := newMockDataSink2()
if err := ts.Register(ds); err != nil {
panic(err)
Expand Down Expand Up @@ -194,7 +198,7 @@ func newSQLCPUTimeRecord(tsr *RemoteTopSQLReporter, sqlID int, cpuTimeMs uint32)

key = []byte("planDigest" + strconv.Itoa(sqlID))
value = "planNormalized" + strconv.Itoa(sqlID)
tsr.RegisterPlan(key, value)
tsr.RegisterPlan(key, value, false)

return collector.SQLCPUTimeRecord{
SQLDigest: []byte("sqlDigest" + strconv.Itoa(sqlID)),
Expand Down Expand Up @@ -317,7 +321,7 @@ func TestCollectCapacity(t *testing.T) {
for i := 0; i < n; i++ {
key := []byte("planDigest" + strconv.Itoa(i))
value := "planNormalized" + strconv.Itoa(i)
tsr.RegisterPlan(key, value)
tsr.RegisterPlan(key, value, false)
}
}
genRecord := func(n int) []collector.SQLCPUTimeRecord {
Expand Down Expand Up @@ -391,7 +395,7 @@ func TestMultipleDataSinks(t *testing.T) {
topsqlstate.GlobalState.ReportIntervalSeconds.Store(1)
topsqlstate.EnableTopSQL()

tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)
tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc)

var chs []chan *ReportData
for i := 0; i < 7; i++ {
Expand Down Expand Up @@ -477,7 +481,7 @@ func TestMultipleDataSinks(t *testing.T) {
func TestReporterWorker(t *testing.T) {
topsqlstate.GlobalState.ReportIntervalSeconds.Store(3)

r := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc)
r := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc)
r.Start()
defer r.Close()

Expand Down
9 changes: 2 additions & 7 deletions util/topsql/topsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
)

func init() {
remoteReporter := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan)
remoteReporter := reporter.NewRemoteTopSQLReporter(plancodec.DecodeNormalizedPlan, plancodec.Compress)
globalTopSQLReport = remoteReporter
singleTargetDataSink = reporter.NewSingleTargetDataSink(remoteReporter)
}
Expand Down Expand Up @@ -182,10 +182,5 @@ func linkSQLTextWithDigest(sqlDigest []byte, normalizedSQL string, isInternal bo
}

func linkPlanTextWithDigest(planDigest []byte, normalizedBinaryPlan string) {
if len(normalizedBinaryPlan) > MaxBinaryPlanSize {
// ignore the huge size plan
return
}

globalTopSQLReport.RegisterPlan(planDigest, normalizedBinaryPlan)
globalTopSQLReport.RegisterPlan(planDigest, normalizedBinaryPlan, len(normalizedBinaryPlan) > MaxBinaryPlanSize)
}
Loading

0 comments on commit ec9f201

Please sign in to comment.