Skip to content

Commit

Permalink
telemetry, session: add telemetry collector for placement policies (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored Mar 17, 2022
1 parent af1ea80 commit eb4cfce
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 48 deletions.
3 changes: 3 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,9 @@ func (do *Domain) MockInfoCacheAndLoadInfoSchema(is infoschema.InfoSchema) {

func init() {
initByLDFlagsForGlobalKill()
telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema {
return GetDomain(ctx).InfoSchema()
}
}

var (
Expand Down
28 changes: 0 additions & 28 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3378,34 +3378,6 @@ func (s *session) GetTxnWriteThroughputSLI() *sli.TxnWriteThroughputSLI {
return &s.txn.writeSLI
}

var _ telemetry.TemporaryOrCacheTableFeatureChecker = &session{}

// TemporaryTableExists is used by the telemetry package to avoid circle dependency.
func (s *session) TemporaryTableExists() bool {
is := domain.GetDomain(s).InfoSchema()
for _, dbInfo := range is.AllSchemas() {
for _, tbInfo := range is.SchemaTables(dbInfo.Name) {
if tbInfo.Meta().TempTableType != model.TempTableNone {
return true
}
}
}
return false
}

// CachedTableExists is used by the telemetry package to avoid circle dependency.
func (s *session) CachedTableExists() bool {
is := domain.GetDomain(s).InfoSchema()
for _, dbInfo := range is.AllSchemas() {
for _, tbInfo := range is.SchemaTables(dbInfo.Name) {
if tbInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return true
}
}
}
return false
}

// GetInfoSchema returns snapshotInfoSchema if snapshot schema is set.
// Transaction infoschema is returned if inside an explicit txn.
// Otherwise the latest infoschema is returned.
Expand Down
81 changes: 61 additions & 20 deletions telemetry/data_feature_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,82 @@ type featureUsage struct {
Txn *TxnUsage `json:"txn"`
// cluster index usage information
// key is the first 6 characters of sha2(TABLE_NAME, 256)
ClusterIndex *ClusterIndexUsage `json:"clusterIndex"`
TemporaryTable bool `json:"temporaryTable"`
CTE *m.CTEUsageCounter `json:"cte"`
CachedTable bool `json:"cachedTable"`
AutoCapture bool `json:"autoCapture"`
ClusterIndex *ClusterIndexUsage `json:"clusterIndex"`
TemporaryTable bool `json:"temporaryTable"`
CTE *m.CTEUsageCounter `json:"cte"`
CachedTable bool `json:"cachedTable"`
AutoCapture bool `json:"autoCapture"`
PlacementPolicyUsage *placementPolicyUsage `json:"placementPolicy"`
}

type placementPolicyUsage struct {
NumPlacementPolicies uint64 `json:"numPlacementPolicies"`
NumDBWithPolicies uint64 `json:"numDBWithPolicies"`
NumTableWithPolicies uint64 `json:"numTableWithPolicies"`
// The number of partitions that policies are explicitly specified.
NumPartitionWithExplicitPolicies uint64 `json:"numPartitionWithExplicitPolicies"`
}

func getFeatureUsage(ctx sessionctx.Context) (*featureUsage, error) {
clusterIdxUsage, err := getClusterIndexUsageInfo(ctx)
var usage featureUsage
var err error
usage.ClusterIndex, err = getClusterIndexUsageInfo(ctx)
if err != nil {
logutil.BgLogger().Info(err.Error())
return nil, err
}

// transaction related feature
txnUsage := getTxnUsageInfo(ctx)
usage.Txn = getTxnUsageInfo(ctx)

usage.CTE = getCTEUsageInfo()

// Avoid the circle dependency.
temporaryTable := ctx.(TemporaryOrCacheTableFeatureChecker).TemporaryTableExists()
usage.AutoCapture = getAutoCaptureUsageInfo(ctx)

cteUsage := getCTEUsageInfo()
collectFeatureUsageFromInfoschema(ctx, &usage)
return &usage, nil
}

cachedTable := ctx.(TemporaryOrCacheTableFeatureChecker).CachedTableExists()
// collectFeatureUsageFromInfoschema updates the usage for temporary table, cached table and placement policies.
func collectFeatureUsageFromInfoschema(ctx sessionctx.Context, usage *featureUsage) {
if usage.PlacementPolicyUsage == nil {
usage.PlacementPolicyUsage = &placementPolicyUsage{}
}
is := GetDomainInfoSchema(ctx)
for _, dbInfo := range is.AllSchemas() {
if dbInfo.PlacementPolicyRef != nil {
usage.PlacementPolicyUsage.NumDBWithPolicies++
}

enableAutoCapture := getAutoCaptureUsageInfo(ctx)
return &featureUsage{txnUsage, clusterIdxUsage, temporaryTable, cteUsage, cachedTable, enableAutoCapture}, nil
for _, tbInfo := range is.SchemaTables(dbInfo.Name) {
if tbInfo.Meta().TempTableType != model.TempTableNone {
usage.TemporaryTable = true
}
if tbInfo.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
usage.CachedTable = true
}
if tbInfo.Meta().PlacementPolicyRef != nil {
usage.PlacementPolicyUsage.NumTableWithPolicies++
}
partitions := tbInfo.Meta().GetPartitionInfo()
if partitions == nil {
continue
}
for _, partitionInfo := range partitions.Definitions {
if partitionInfo.PlacementPolicyRef != nil {
usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies++
}
}
}
}

usage.PlacementPolicyUsage.NumPlacementPolicies += uint64(len(is.AllPlacementPolicies()))
}

// GetDomainInfoSchema is used by the telemetry package to get the latest schema information
// while avoiding circle dependency with domain package.
var GetDomainInfoSchema func(sessionctx.Context) infoschema.InfoSchema

// ClusterIndexUsage records the usage info of all the tables, no more than 10k tables
type ClusterIndexUsage map[string]TableClusteredInfo

Expand Down Expand Up @@ -139,13 +187,6 @@ func getClusterIndexUsageInfo(ctx sessionctx.Context) (cu *ClusterIndexUsage, er
return &usage, nil
}

// TemporaryOrCacheTableFeatureChecker is defined to avoid package circle dependency.
// The session struct implements this interface.
type TemporaryOrCacheTableFeatureChecker interface {
TemporaryTableExists() bool
CachedTableExists() bool
}

// TxnUsage records the usage info of transaction related features, including
// async-commit, 1PC and counters of transactions committed with different protocols.
type TxnUsage struct {
Expand Down
43 changes: 43 additions & 0 deletions telemetry/data_feature_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,49 @@ func TestCachedTable(t *testing.T) {
require.False(t, usage.CachedTable)
}

func TestPlacementPolicies(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

usage, err := telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumPlacementPolicies)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumDBWithPolicies)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumTableWithPolicies)
require.Equal(t, uint64(0), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies)

tk.MustExec("create placement policy p1 followers=4;")
tk.MustExec(`create placement policy p2 primary_region="cn-east-1" regions="cn-east-1,cn-east"`)
tk.MustExec(`create placement policy p3 followers=3`)
tk.MustExec("alter database test placement policy=p1;")
tk.MustExec("create table t1(a int);")
tk.MustExec("create table t2(a int) placement policy=p2;")
tk.MustExec("create table t3(id int) PARTITION BY RANGE (id) (" +
"PARTITION p0 VALUES LESS THAN (100) placement policy p3," +
"PARTITION p1 VALUES LESS THAN (1000))")

usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(3), usage.PlacementPolicyUsage.NumPlacementPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumDBWithPolicies)
require.Equal(t, uint64(3), usage.PlacementPolicyUsage.NumTableWithPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies)

tk.MustExec("drop table t2;")
tk.MustExec("drop placement policy p2;")
tk.MustExec("alter table t3 placement policy=default")

usage, err = telemetry.GetFeatureUsage(tk.Session())
require.NoError(t, err)
require.Equal(t, uint64(2), usage.PlacementPolicyUsage.NumPlacementPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumDBWithPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumTableWithPolicies)
require.Equal(t, uint64(1), usage.PlacementPolicyUsage.NumPartitionWithExplicitPolicies)
}

func TestAutoCapture(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down

0 comments on commit eb4cfce

Please sign in to comment.