Skip to content

Commit

Permalink
Deprecate old metrics in VTOrc and replace with new ones (#15994)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored May 28, 2024
1 parent 8f3c035 commit 0c2856e
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 9 deletions.
19 changes: 19 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [vitess/base and vitess/k8s Docker images](#base-k8s-images)
- [`gh-ost` binary and endtoend tests](#gh-ost-binary-tests-removal)
- **[Breaking changes](#breaking-changes)**
- [Metric Name Changes in VTOrc](#metric-change-vtorc)
- [ENUM and SET column handling in VTGate VStream API](#enum-set-vstream)
- [`shutdown_grace_period` Default Change](#shutdown-grace-period-default)
- [New `unmanaged` Flag and `disable_active_reparents` deprecation](#unmanaged-flag)
Expand Down Expand Up @@ -108,6 +109,24 @@ Vitess' endtoend tests no longer use nor test `gh-ost` migrations.

### <a id="breaking-changes"/>Breaking Changes

#### <a id="metric-change-vtorc"/>Metric Name Changes in VTOrc

The following metric names have been changed in VTOrc. The old metrics are still available in `/debug/vars` for this release, but will be removed in later releases. The new metric names and the deprecated metric names resolve to the same metric name on prometheus, so there is no change there.

| Old Metric Name | New Metric Name | Name in Prometheus |
|:--------------------------------------------:|:----------------------------------------:|:--------------------------------------------------:|
| `analysis.change.write` | `AnalysisChangeWrite` | `vtorc_analysis_change_write` |
| `audit.write` | `AuditWrite` | `vtorc_audit_write` |
| `discoveries.attempt` | `DiscoveriesAttempt` | `vtorc_discoveries_attempt` |
| `discoveries.fail` | `DiscoveriesFail` | `vtorc_discoveries_fail` |
| `discoveries.instance_poll_seconds_exceeded` | `DiscoveriesInstancePollSecondsExceeded` | `vtorc_discoveries_instance_poll_seconds_exceeded` |
| `discoveries.queue_length` | `DiscoveriesQueueLength` | `vtorc_discoveries_queue_length` |
| `discoveries.recent_count` | `DiscoveriesRecentCount` | `vtorc_discoveries_recent_count` |
| `instance.read` | `InstanceRead` | `vtorc_instance_read` |
| `instance.read_topology` | `InstanceReadTopology` | `vtorc_instance_read_topology` |

#### <a id="enum-set-vstream"/>ENUM and SET column handling in VTGate VStream API

The [VTGate VStream API](https://vitess.io/docs/reference/vreplication/vstream/) now returns [`ENUM`](https://dev.mysql.com/doc/refman/en/enum.html) and [`SET`](https://dev.mysql.com/doc/refman/en/set.html) column type values in [`VEvent`](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#VEvent) messages (in the embedded [`RowChange`](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#RowChange) messages) as their string values instead of the integer based ones — in both the copy/snapshot phase and the streaming phase. This change was done to make the `VStream` API more user-friendly, intuitive, and to align the behavior across both phases. Before [this change](https://github.com/vitessio/vitess/pull/15723) the values for [`ENUM`](https://dev.mysql.com/doc/refman/en/enum.html) and [`SET`](https://dev.mysql.com/doc/refman/en/set.html) columns were string values in the copy phase but integer values (which only have an internal meaning to MySQL) in the streaming phase. This inconsistency led to various [challenges and issues](https://github.com/vitessio/vitess/issues/15750) for each `VStream` client/consumer (e.g. the [`Debezium` Vitess connector](https://debezium.io/documentation/reference/stable/connectors/vitess.html) failed to properly perform a snapshot for tables containing these column types). Now the behavior is intuitive — clients need the string values as the eventual sink is often not MySQL so each consumer needed to perform the mappings themselves — and consistent. While this is a (potentially) breaking change, a new boolean field has been added to the [`FieldEvent`](https://pkg.go.dev/vitess.io/vitess/go/vt/proto/binlogdata#FieldEvent) message called `EnumSetStringValues`. When that field is `false` (in Vitess v19 and older) then the consumer will need to perform the mappings during streaming phase, but not during copy phase. When this field is `true`, then no mapping is required. This will help to ensure a smooth transition for all consumers over time. To demonstrate, let's look at the textual output (printing the received `VEvents` as strings) when streaming a single `enum_set_test` table from the unsharded `commerce` keyspace so that we can see what the VStream looks like before and after when we start a new VStream in copy/snapshot mode and then transition to streaming mode for the following table:
Expand Down
34 changes: 34 additions & 0 deletions go/stats/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package stats

import (
"expvar"
"fmt"
"math"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -45,6 +47,22 @@ func NewCounter(name string, help string) *Counter {
return v
}

// NewCounterWithDeprecatedName returns a new Counter that also has a deprecated name that can be removed in a future release.
// It is important to ensure that we only call this function with values for name and deprecatedName such that they match to the same
// metric name in snake case.
func NewCounterWithDeprecatedName(name string, deprecatedName string, help string) *Counter {
// Ensure that the snake case for the deprecated name and the new name are the same.
if deprecatedName == "" || GetSnakeName(name) != GetSnakeName(deprecatedName) {
panic(fmt.Sprintf("New name for deprecated metric doesn't have the same snake case - %v", deprecatedName))
}
v := &Counter{help: help}
// We want to publish the deprecated name for backward compatibility.
// At the same time we want the new metric to be visible on the `/debug/vars` page, so we publish the new name in expvar.
publish(deprecatedName, v)
expvar.Publish(name, v)
return v
}

// Add adds the provided value to the Counter.
func (v *Counter) Add(delta int64) {
if delta < 0 {
Expand Down Expand Up @@ -136,6 +154,22 @@ func NewGauge(name string, help string) *Gauge {
return v
}

// NewGaugeWithDeprecatedName creates a new Gauge and publishes it if name is set that also has a deprecated name that can be removed in a future release.
// It is important to ensure that we only call this function with values for name and deprecatedName such that they match to the same metric name in snake case.
func NewGaugeWithDeprecatedName(name string, deprecatedName string, help string) *Gauge {
// Ensure that the snake case for the deprecated name and the new name are the same.
if deprecatedName == "" || GetSnakeName(name) != GetSnakeName(deprecatedName) {
panic(fmt.Sprintf("New name for deprecated metric doesn't have the same snake case - %v", deprecatedName))
}
v := &Gauge{Counter: Counter{help: help}}

// We want to publish the deprecated name for backward compatibility.
// At the same time we want the new metric to be visible on the `/debug/vars` page, so we publish the new name in expvar.
publish(deprecatedName, v)
expvar.Publish(name, v)
return v
}

// Set overwrites the current value.
func (v *Gauge) Set(value int64) {
v.Counter.i.Store(value)
Expand Down
95 changes: 95 additions & 0 deletions go/stats/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package stats

import (
"expvar"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCounter(t *testing.T) {
Expand Down Expand Up @@ -91,3 +94,95 @@ func TestGaugeFloat64(t *testing.T) {
v.Reset()
assert.Equal(t, float64(0), v.Get())
}

func TestNewCounterWithDeprecatedName(t *testing.T) {
clearStats()
Register(func(name string, v expvar.Var) {})

testcases := []struct {
name string
deprecatedName string
shouldPanic bool
}{
{
name: "new_name",
deprecatedName: "deprecatedName",
shouldPanic: true,
},
{
name: "metricName_test",
deprecatedName: "metric.name-test",
shouldPanic: false,
},
{
name: "MetricNameTesting",
deprecatedName: "metric.name.testing",
shouldPanic: false,
},
}

for _, testcase := range testcases {
t.Run(fmt.Sprintf("%v-%v", testcase.name, testcase.deprecatedName), func(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
panicReceived := false
go func() {
defer func() {
if x := recover(); x != nil {
panicReceived = true
}
wg.Done()
}()
NewCounterWithDeprecatedName(testcase.name, testcase.deprecatedName, "help")
}()
wg.Wait()
require.EqualValues(t, testcase.shouldPanic, panicReceived)
})
}
}

func TestNewGaugeWithDeprecatedName(t *testing.T) {
clearStats()
Register(func(name string, v expvar.Var) {})

testcases := []struct {
name string
deprecatedName string
shouldPanic bool
}{
{
name: "gauge_new_name",
deprecatedName: "gauge_deprecatedName",
shouldPanic: true,
},
{
name: "gauge-metricName_test",
deprecatedName: "gauge_metric.name-test",
shouldPanic: false,
},
{
name: "GaugeMetricNameTesting",
deprecatedName: "gauge.metric.name.testing",
shouldPanic: false,
},
}

for _, testcase := range testcases {
t.Run(fmt.Sprintf("%v-%v", testcase.name, testcase.deprecatedName), func(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
panicReceived := false
go func() {
defer func() {
if x := recover(); x != nil {
panicReceived = true
}
wg.Done()
}()
NewGaugeWithDeprecatedName(testcase.name, testcase.deprecatedName, "help")
}()
wg.Wait()
require.EqualValues(t, testcase.shouldPanic, panicReceived)
})
}
}
16 changes: 16 additions & 0 deletions go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,22 @@ func (orc *VTOrcProcess) GetVars() map[string]any {
return nil
}

// GetMetrics gets the metrics exported on the /metrics page of VTOrc
func (orc *VTOrcProcess) GetMetrics() string {
varsURL := fmt.Sprintf("http://localhost:%d/metrics", orc.Port)
resp, err := http.Get(varsURL)
if err != nil {
return ""
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
respByte, _ := io.ReadAll(resp.Body)
return string(respByte)
}
return ""
}

// MakeAPICall makes an API call on the given endpoint of VTOrc
func (orc *VTOrcProcess) MakeAPICall(endpoint string) (status int, response string, err error) {
url := fmt.Sprintf("http://localhost:%d/%s", orc.Port, endpoint)
Expand Down
35 changes: 35 additions & 0 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,41 @@ func TestAPIEndpoints(t *testing.T) {
},`)
})

t.Run("Check Vars and Metrics", func(t *testing.T) {
// These are vars that will be deprecated in v21.
utils.CheckVarExists(t, vtorc, "analysis.change.write")
utils.CheckVarExists(t, vtorc, "audit.write")
utils.CheckVarExists(t, vtorc, "discoveries.attempt")
utils.CheckVarExists(t, vtorc, "discoveries.fail")
utils.CheckVarExists(t, vtorc, "discoveries.instance_poll_seconds_exceeded")
utils.CheckVarExists(t, vtorc, "discoveries.queue_length")
utils.CheckVarExists(t, vtorc, "discoveries.recent_count")
utils.CheckVarExists(t, vtorc, "instance.read")
utils.CheckVarExists(t, vtorc, "instance.read_topology")

// Newly added vars.
utils.CheckVarExists(t, vtorc, "AnalysisChangeWrite")
utils.CheckVarExists(t, vtorc, "AuditWrite")
utils.CheckVarExists(t, vtorc, "DiscoveriesAttempt")
utils.CheckVarExists(t, vtorc, "DiscoveriesFail")
utils.CheckVarExists(t, vtorc, "DiscoveriesInstancePollSecondsExceeded")
utils.CheckVarExists(t, vtorc, "DiscoveriesQueueLength")
utils.CheckVarExists(t, vtorc, "DiscoveriesRecentCount")
utils.CheckVarExists(t, vtorc, "InstanceRead")
utils.CheckVarExists(t, vtorc, "InstanceReadTopology")

// Metrics registered in prometheus
utils.CheckMetricExists(t, vtorc, "vtorc_analysis_change_write")
utils.CheckMetricExists(t, vtorc, "vtorc_audit_write")
utils.CheckMetricExists(t, vtorc, "vtorc_discoveries_attempt")
utils.CheckMetricExists(t, vtorc, "vtorc_discoveries_fail")
utils.CheckMetricExists(t, vtorc, "vtorc_discoveries_instance_poll_seconds_exceeded")
utils.CheckMetricExists(t, vtorc, "vtorc_discoveries_queue_length")
utils.CheckMetricExists(t, vtorc, "vtorc_discoveries_recent_count")
utils.CheckMetricExists(t, vtorc, "vtorc_instance_read")
utils.CheckMetricExists(t, vtorc, "vtorc_instance_read_topology")
})

t.Run("Disable Recoveries API", func(t *testing.T) {
// Disable recoveries of VTOrc
status, resp, err := utils.MakeAPICall(t, vtorc, "/api/disable-global-recoveries")
Expand Down
15 changes: 15 additions & 0 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,21 @@ func WaitForSuccessfulERSCount(t *testing.T, vtorcInstance *cluster.VTOrcProcess
assert.EqualValues(t, countExpected, successCount)
}

// CheckVarExists checks whether the given metric exists or not in /debug/vars.
func CheckVarExists(t *testing.T, vtorcInstance *cluster.VTOrcProcess, metricName string) {
t.Helper()
vars := vtorcInstance.GetVars()
_, exists := vars[metricName]
assert.True(t, exists)
}

// CheckMetricExists checks whether the given metric exists or not in /metrics.
func CheckMetricExists(t *testing.T, vtorcInstance *cluster.VTOrcProcess, metricName string) {
t.Helper()
metrics := vtorcInstance.GetMetrics()
assert.Contains(t, metrics, metricName)
}

// getIntFromValue is a helper function to get an integer from the given value.
// If it is convertible to a float, then we round the number to the nearest integer.
// If the value is not numeric at all, we return 0.
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
"vitess.io/vitess/go/vt/vtorc/util"
)

var analysisChangeWriteCounter = stats.NewCounter("analysis.change.write", "Number of times analysis has changed")
// The metric is registered with a deprecated name. The old metric name can be removed in v21.
var analysisChangeWriteCounter = stats.NewCounterWithDeprecatedName("AnalysisChangeWrite", "analysis.change.write", "Number of times analysis has changed")

var recentInstantAnalysis *cache.Cache

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtorc/inst/audit_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
"vitess.io/vitess/go/vt/vtorc/db"
)

var auditOperationCounter = stats.NewCounter("audit.write", "Number of audit operations performed")
// The metric is registered with a deprecated name. The old metric name can be removed in v21.
var auditOperationCounter = stats.NewCounterWithDeprecatedName("AuditWrite", "audit.write", "Number of audit operations performed")

// AuditOperation creates and writes a new audit entry by given params
func AuditOperation(auditType string, tabletAlias string, message string) error {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ var (
var forgetAliases *cache.Cache

var (
readTopologyInstanceCounter = stats.NewCounter("instance.read_topology", "Number of times an instance was read from the topology")
readInstanceCounter = stats.NewCounter("instance.read", "Number of times an instance was read")
// The metrics are registered with deprecated names. The old metric names can be removed in v21.
readTopologyInstanceCounter = stats.NewCounterWithDeprecatedName("InstanceReadTopology", "instance.read_topology", "Number of times an instance was read from the topology")
readInstanceCounter = stats.NewCounterWithDeprecatedName("InstanceRead", "instance.read", "Number of times an instance was read")
backendWrites = collection.CreateOrReturnCollection("BACKEND_WRITES")
writeBufferLatency = stopwatch.NewNamedStopwatch()
)
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ var snapshotDiscoveryKeys chan string
var snapshotDiscoveryKeysMutex sync.Mutex
var hasReceivedSIGTERM int32

var discoveriesCounter = stats.NewCounter("discoveries.attempt", "Number of discoveries attempted")
var failedDiscoveriesCounter = stats.NewCounter("discoveries.fail", "Number of failed discoveries")
var instancePollSecondsExceededCounter = stats.NewCounter("discoveries.instance_poll_seconds_exceeded", "Number of instances that took longer than InstancePollSeconds to poll")
var discoveryQueueLengthGauge = stats.NewGauge("discoveries.queue_length", "Length of the discovery queue")
var discoveryRecentCountGauge = stats.NewGauge("discoveries.recent_count", "Number of recent discoveries")
// The metrics are registered with deprecated names. The old metric names can be removed in v21.
var discoveriesCounter = stats.NewCounterWithDeprecatedName("DiscoveriesAttempt", "discoveries.attempt", "Number of discoveries attempted")
var failedDiscoveriesCounter = stats.NewCounterWithDeprecatedName("DiscoveriesFail", "discoveries.fail", "Number of failed discoveries")
var instancePollSecondsExceededCounter = stats.NewCounterWithDeprecatedName("DiscoveriesInstancePollSecondsExceeded", "discoveries.instance_poll_seconds_exceeded", "Number of instances that took longer than InstancePollSeconds to poll")
var discoveryQueueLengthGauge = stats.NewGaugeWithDeprecatedName("DiscoveriesQueueLength", "discoveries.queue_length", "Length of the discovery queue")
var discoveryRecentCountGauge = stats.NewGaugeWithDeprecatedName("DiscoveriesRecentCount", "discoveries.recent_count", "Number of recent discoveries")
var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName)

var recentDiscoveryOperationKeys *cache.Cache
Expand Down

0 comments on commit 0c2856e

Please sign in to comment.