Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry, session: add telemetry collector for placement policies #33165

Merged
merged 4 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,9 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {

func init() {
initByLDFlagsForGlobalKill()
telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema {
return GetDomain(ctx).InfoSchema()
}
}

var (
Expand Down
28 changes: 0 additions & 28 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3378,34 +3378,6 @@ func (s *session) GetTxnWriteThroughputSLI() *sli.TxnWriteThroughputSLI {
return &s.txn.writeSLI
}

var _ telemetry.TemporaryOrCacheTableFeatureChecker = &session{}

// TemporaryTableExists is used by the telemetry package to avoid circle dependency.
func (s *session) TemporaryTableExists() bool {
is := domain.GetDomain(s).InfoSchema()
for _, dbInfo := range is.AllSchemas() {
for _, tbInfo := range is.SchemaTables(dbInfo.Name) {
if tbInfo.Meta().TempTableType != model.TempTableNone {
return true
}
}
}
return false
}

// CachedTableExists is used by the telemetry package to avoid circle dependency.
func (s *session) CachedTableExists() bool {
is := domain.GetDomain(s).InfoSchema()
for _, dbInfo := range is.AllSchemas() {
for _, tbInfo := range is.SchemaTables(dbInfo.Name) {
if tbInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return true
}
}
}
return false
}

// GetInfoSchema returns snapshotInfoSchema if snapshot schema is set.
// Transaction infoschema is returned if inside an explicit txn.
// Otherwise the latest infoschema is returned.
Expand Down
81 changes: 61 additions & 20 deletions telemetry/data_feature_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,82 @@ type featureUsage struct {
Txn *TxnUsage `json:"txn"`
// cluster index usage information
// key is the first 6 characters of sha2(TABLE_NAME, 256)
ClusterIndex *ClusterIndexUsage `json:"clusterIndex"`
TemporaryTable bool `json:"temporaryTable"`
CTE *m.CTEUsageCounter `json:"cte"`
CachedTable bool `json:"cachedTable"`
AutoCapture bool `json:"autoCapture"`
ClusterIndex *ClusterIndexUsage `json:"clusterIndex"`
TemporaryTable bool `json:"temporaryTable"`
CTE *m.CTEUsageCounter `json:"cte"`
CachedTable bool `json:"cachedTable"`
AutoCapture bool `json:"autoCapture"`
PlacementPolicyUsage *placementPolicyUsage `json:"placementPolicy"`
}

type placementPolicyUsage struct {
NumPlacementPolicies uint64 `json:"numPlacementPolicies"`
NumDBWithPolicies uint64 `json:"numDBWithPolicies"`
NumTableWithPolicies uint64 `json:"numTableWithPolicies"`
// The number of partitions that policies are explicitly specified.
NumPartitionWithExplicitPolicies uint64 `json:"numPartitionWithExplicitPolicies"`
}

func getFeatureUsage(ctx sessionctx.Context) (*featureUsage, error) {
clusterIdxUsage, err := getClusterIndexUsageInfo(ctx)
var usage featureUsage
var err error
usage.ClusterIndex, err = getClusterIndexUsageInfo(ctx)
if err != nil {
logutil.BgLogger().Info(err.Error())
return nil, err
}

// transaction related feature
txnUsage := getTxnUsageInfo(ctx)
usage.Txn = getTxnUsageInfo(ctx)

usage.CTE = getCTEUsageInfo()

// Avoid the circle dependency.
temporaryTable := ctx.(TemporaryOrCacheTableFeatureChecker).TemporaryTableExists()
usage.AutoCapture = getAutoCaptureUsageInfo(ctx)

cteUsage := getCTEUsageInfo()
collectFeatureUsageFromInfoschema(ctx, &usage)
return &usage, nil
}

cachedTable := ctx.(TemporaryOrCacheTableFeatureChecker).CachedTableExists()
// collectFeatureUsageFromInfoschema updates the usage for temporary table, cached table and placement policies.
func collectFeatureUsageFromInfoschema(ctx sessionctx.Context, usage *featureUsage) {
if usage.PlacementPolicyUsage == nil {
usage.PlacementPolicyUsage = &placementPolicyUsage{}
}
is := GetDomainInfoSchema(ctx)
for _, dbInfo := range is.AllSchemas() {
if dbInfo.PlacementPolicyRef != nil {
usage.PlacementPolicyUsage.NumDBWithPolicies++
}

enableAutoCapture := getAutoCaptureUsageInfo(ctx)
return &featureUsage{txnUsage, clusterIdxUsage, temporaryTable, cteUsage, cachedTable, enableAutoCapture}, nil
for _, tbInfo := range is.SchemaTables(dbInfo.Name) {
if tbInfo.Meta().TempTableType != model.TempTableNone {
usage.TemporaryTable = true
}
if tbInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
usage.CachedTable = true
}
if tbInfo.Meta().PlacementPolicyRef != nil {
usage.PlacementPolicyUsage.NumTableWithPolicies++
}
partitions := tbInfo.Meta().GetPartitionInfo()
if partitions == nil {
continue
}
for _, partitionInfo := range partitions.Definitions {
if partitionInfo.PlacementPolicyRef != nil {
usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies++
}
}
}
}

usage.PlacementPolicyUsage.NumPlacementPolicies += uint64(len(is.AllPlacementPolicies()))
}

// GetDomainInfoSchema is used by the telemetry package to get the latest schema information
// while avoiding circle dependency with domain package.
var GetDomainInfoSchema func(sessionctx.Context) infoschema.InfoSchema

// ClusterIndexUsage records the usage info of all the tables, no more than 10k tables
type ClusterIndexUsage map[string]TableClusteredInfo

Expand Down Expand Up @@ -139,13 +187,6 @@ func getClusterIndexUsageInfo(ctx sessionctx.Context) (cu *ClusterIndexUsage, er
return &usage, nil
}

// TemporaryOrCacheTableFeatureChecker is defined to avoid package circle dependency.
// The session struct implements this interface.
type TemporaryOrCacheTableFeatureChecker interface {
TemporaryTableExists() bool
CachedTableExists() bool
}

// TxnUsage records the usage info of transaction related features, including
// async-commit, 1PC and counters of transactions committed with different protocols.
type TxnUsage struct {
Expand Down
43 changes: 43 additions & 0 deletions telemetry/data_feature_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,49 @@ func TestCachedTable(t *testing.T) {
require.False(t, usage.CachedTable)
}

func TestPlacementPolicies(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

usage, err := telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumPlacementPolicies)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumDBWithPolicies)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumTableWithPolicies)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies)

tk.MustExec("create placement policy p1 followers=4;")
tk.MustExec(`create placement policy p2 primary_region="cn-east-1" regions="cn-east-1,cn-east"`)
tk.MustExec(`create placement policy p3 followers=3`)
tk.MustExec("alter database test placement policy=p1;")
tk.MustExec("create table t1(a int);")
tk.MustExec("create table t2(a int) placement policy=p2;")
tk.MustExec("create table t3(id int) PARTITION BY RANGE (id) (" +
"PARTITION p0 VALUES LESS THAN (100) placement policy p3," +
"PARTITION p1 VALUES LESS THAN (1000))")

usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(3), usage.PlacementPolicyUsage.NumPlacementPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumDBWithPolicies)
require.Equal(t, uint64(3), usage.PlacementPolicyUsage.NumTableWithPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies)

tk.MustExec("drop table t2;")
tk.MustExec("drop placement policy p2;")
tk.MustExec("alter table t3 placement policy=default")

usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(2), usage.PlacementPolicyUsage.NumPlacementPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumDBWithPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumTableWithPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies)
}

func TestAutoCapture(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down