From b623332afff4ba4db6b9a84dc7ef4b58871710f3 Mon Sep 17 00:00:00 2001 From: bb7133 Date: Thu, 17 Mar 2022 00:13:16 +0800 Subject: [PATCH] telemetry, session: add telemetry collector for placement policies --- domain/domain.go | 3 ++ session/session.go | 28 ---------- telemetry/data_feature_usage.go | 81 +++++++++++++++++++++------- telemetry/data_feature_usage_test.go | 43 +++++++++++++++ 4 files changed, 107 insertions(+), 48 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 57dbc5ac01090..095bd4e4fe559 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1794,6 +1794,9 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) { func init() { initByLDFlagsForGlobalKill() + telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema { + return GetDomain(ctx).InfoSchema() + } } var ( diff --git a/session/session.go b/session/session.go index 7da545b55029e..2e8449182ca93 100644 --- a/session/session.go +++ b/session/session.go @@ -3378,34 +3378,6 @@ func (s *session) GetTxnWriteThroughputSLI() *sli.TxnWriteThroughputSLI { return &s.txn.writeSLI } -var _ telemetry.TemporaryOrCacheTableFeatureChecker = &session{} - -// TemporaryTableExists is used by the telemetry package to avoid circle dependency. -func (s *session) TemporaryTableExists() bool { - is := domain.GetDomain(s).InfoSchema() - for _, dbInfo := range is.AllSchemas() { - for _, tbInfo := range is.SchemaTables(dbInfo.Name) { - if tbInfo.Meta().TempTableType != model.TempTableNone { - return true - } - } - } - return false -} - -// CachedTableExists is used by the telemetry package to avoid circle dependency. -func (s *session) CachedTableExists() bool { - is := domain.GetDomain(s).InfoSchema() - for _, dbInfo := range is.AllSchemas() { - for _, tbInfo := range is.SchemaTables(dbInfo.Name) { - if tbInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable { - return true - } - } - } - return false -} - // GetInfoSchema returns snapshotInfoSchema if snapshot schema is set. // Transaction infoschema is returned if inside an explicit txn. // Otherwise the latest infoschema is returned. diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index e999269ce6a4e..8854e72acd3b9 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -33,34 +33,82 @@ type featureUsage struct { Txn *TxnUsage `json:"txn"` // cluster index usage information // key is the first 6 characters of sha2(TABLE_NAME, 256) - ClusterIndex *ClusterIndexUsage `json:"clusterIndex"` - TemporaryTable bool `json:"temporaryTable"` - CTE *m.CTEUsageCounter `json:"cte"` - CachedTable bool `json:"cachedTable"` - AutoCapture bool `json:"autoCapture"` + ClusterIndex *ClusterIndexUsage `json:"clusterIndex"` + TemporaryTable bool `json:"temporaryTable"` + CTE *m.CTEUsageCounter `json:"cte"` + CachedTable bool `json:"cachedTable"` + AutoCapture bool `json:"autoCapture"` + PlacementPolicyUsage *placementPolicyUsage `json:"placementPolicy"` +} + +type placementPolicyUsage struct { + NumPlacementPolicies uint64 `json:"numPlacementPolicies"` + NumDBWithPolicies uint64 `json:"numDBWithPolicies"` + NumTableWithPolicies uint64 `json:"numTableWithPolicies"` + // The number of partitions that policies are explicitly specified. + NumPartitionWithExplicitPolicies uint64 `json:"numPartitionWithExplicitPolicies"` } func getFeatureUsage(ctx sessionctx.Context) (*featureUsage, error) { - clusterIdxUsage, err := getClusterIndexUsageInfo(ctx) + var usage featureUsage + var err error + usage.ClusterIndex, err = getClusterIndexUsageInfo(ctx) if err != nil { logutil.BgLogger().Info(err.Error()) return nil, err } // transaction related feature - txnUsage := getTxnUsageInfo(ctx) + usage.Txn = getTxnUsageInfo(ctx) + + usage.CTE = getCTEUsageInfo() - // Avoid the circle dependency. - temporaryTable := ctx.(TemporaryOrCacheTableFeatureChecker).TemporaryTableExists() + usage.AutoCapture = getAutoCaptureUsageInfo(ctx) - cteUsage := getCTEUsageInfo() + collectFeatureUsageFromInfoschema(ctx, &usage) + return &usage, nil +} - cachedTable := ctx.(TemporaryOrCacheTableFeatureChecker).CachedTableExists() +// collectFeatureUsageFromInfoschema updates the usage for temporary table, cached table and placement policies. +func collectFeatureUsageFromInfoschema(ctx sessionctx.Context, usage *featureUsage) { + if usage.PlacementPolicyUsage == nil { + usage.PlacementPolicyUsage = &placementPolicyUsage{} + } + is := GetDomainInfoSchema(ctx) + for _, dbInfo := range is.AllSchemas() { + if dbInfo.PlacementPolicyRef != nil { + usage.PlacementPolicyUsage.NumDBWithPolicies++ + } - enableAutoCapture := getAutoCaptureUsageInfo(ctx) - return &featureUsage{txnUsage, clusterIdxUsage, temporaryTable, cteUsage, cachedTable, enableAutoCapture}, nil + for _, tbInfo := range is.SchemaTables(dbInfo.Name) { + if tbInfo.Meta().TempTableType != model.TempTableNone { + usage.TemporaryTable = true + } + if tbInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable { + usage.CachedTable = true + } + if tbInfo.Meta().PlacementPolicyRef != nil { + usage.PlacementPolicyUsage.NumTableWithPolicies++ + } + partitions := tbInfo.Meta().GetPartitionInfo() + if partitions == nil { + continue + } + for _, partitionInfo := range partitions.Definitions { + if partitionInfo.PlacementPolicyRef != nil { + usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies++ + } + } + } + } + + usage.PlacementPolicyUsage.NumPlacementPolicies += uint64(len(is.AllPlacementPolicies())) } +// GetDomainInfoSchema is used by the telemetry package to get the latest schema information +// while avoiding circle dependency with domain package. +var GetDomainInfoSchema func(sessionctx.Context) infoschema.InfoSchema + // ClusterIndexUsage records the usage info of all the tables, no more than 10k tables type ClusterIndexUsage map[string]TableClusteredInfo @@ -139,13 +187,6 @@ func getClusterIndexUsageInfo(ctx sessionctx.Context) (cu *ClusterIndexUsage, er return &usage, nil } -// TemporaryOrCacheTableFeatureChecker is defined to avoid package circle dependency. -// The session struct implements this interface. -type TemporaryOrCacheTableFeatureChecker interface { - TemporaryTableExists() bool - CachedTableExists() bool -} - // TxnUsage records the usage info of transaction related features, including // async-commit, 1PC and counters of transactions committed with different protocols. type TxnUsage struct { diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 5f885411dc486..bbae540f56019 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -126,6 +126,49 @@ func TestCachedTable(t *testing.T) { require.False(t, usage.CachedTable) } +func TestPlacementPolicies(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumPlacementPolicies) + require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumDBWithPolicies) + require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumTableWithPolicies) + require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies) + + tk.MustExec("create placement policy p1 followers=4;") + tk.MustExec(`create placement policy p2 primary_region="cn-east-1" regions="cn-east-1,cn-east"`) + tk.MustExec(`create placement policy p3 followers=3`) + tk.MustExec("alter database test placement policy=p1;") + tk.MustExec("create table t1(a int);") + tk.MustExec("create table t2(a int) placement policy=p2;") + tk.MustExec("create table t3(id int) PARTITION BY RANGE (id) (" + + "PARTITION p0 VALUES LESS THAN (100) placement policy p3," + + "PARTITION p1 VALUES LESS THAN (1000))") + + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, uint64(3), usage.PlacementPolicyUsage.NumPlacementPolicies) + require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumDBWithPolicies) + require.Equal(t, uint64(3), usage.PlacementPolicyUsage.NumTableWithPolicies) + require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies) + + tk.MustExec("drop table t2;") + tk.MustExec("drop placement policy p2;") + tk.MustExec("alter table t3 placement policy=default") + + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, uint64(2), usage.PlacementPolicyUsage.NumPlacementPolicies) + require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumDBWithPolicies) + require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumTableWithPolicies) + require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies) +} + func TestAutoCapture(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean()