Skip to content

Commit

Permalink
slack-19.0: v20 backports, pt. 2 (#459)
Browse files Browse the repository at this point in the history
* `vtgate`: support filtering tablets by tablet-tags  (vitessio#15911)

Signed-off-by: Tim Vaillancourt <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>

* Add support for sampling rate in `streamlog` (vitessio#15919)

Signed-off-by: Tim Vaillancourt <[email protected]>

* Add sql text counts stats to `vtcombo`,`vtgate`+`vttablet` (vitessio#15897)

Signed-off-by: Tim Vaillancourt <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: Harshit Gangal <[email protected]>
Co-authored-by: Deepthi Sigireddi <[email protected]>

---------

Signed-off-by: Tim Vaillancourt <[email protected]>
Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
Co-authored-by: Harshit Gangal <[email protected]>
Co-authored-by: Deepthi Sigireddi <[email protected]>
  • Loading branch information
4 people authored Jul 18, 2024
1 parent 1298a0f commit a0a9e11
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 38 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Flags:
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables).
--queryserver-config-annotate-queries prefix queries to MySQL backend with comment indicating vtgate principal (user) and target tablet type
--queryserver-config-enable-table-acl-dry-run If this flag is enabled, tabletserver will emit monitoring metrics and let the request pass regardless of table acl check results
Expand Down Expand Up @@ -337,6 +338,7 @@ Flags:
--structured-logging enable structured logging
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--table_gc_lifecycle string States for a DROP TABLE garbage collection cycle. Default is 'hold,purge,evac,drop', use any subset ('drop' implicitly always included) (default "hold,purge,evac,drop")
--tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.
--tablet_dir string The directory within the vtdataroot to store vttablet/mysql files. Defaults to being generated by the tablet uid.
--tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ Flags:
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--retry-count int retry count (default 2)
Expand All @@ -194,6 +195,7 @@ Flags:
--stream_buffer_size int the number of bytes sent from vtgate for each stream call. It's recommended to keep this value in sync with vttablet's query-server-config-stream-buffer-size. (default 32768)
--structured-logging enable structured logging
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet-filter-tags StringMap Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.
--tablet_filters strings Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.
--tablet_grpc_ca string the server ca to use to validate servers when connecting
--tablet_grpc_cert string the cert to use to connect
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ Flags:
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables).
--queryserver-config-annotate-queries prefix queries to MySQL backend with comment indicating vtgate principal (user) and target tablet type
--queryserver-config-enable-table-acl-dry-run If this flag is enabled, tabletserver will emit monitoring metrics and let the request pass regardless of table acl check results
Expand Down
21 changes: 21 additions & 0 deletions go/streamlog/streamlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package streamlog
import (
"fmt"
"io"
rand "math/rand/v2"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -51,6 +52,7 @@ var (
queryLogFilterTag string
queryLogRowThreshold uint64
queryLogFormat = "text"
queryLogSampleRate float64
)

func GetRedactDebugUIQueries() bool {
Expand All @@ -69,6 +71,10 @@ func SetQueryLogRowThreshold(newQueryLogRowThreshold uint64) {
queryLogRowThreshold = newQueryLogRowThreshold
}

func SetQueryLogSampleRate(sampleRate float64) {
queryLogSampleRate = sampleRate
}

func GetQueryLogFormat() string {
return queryLogFormat
}
Expand Down Expand Up @@ -96,6 +102,8 @@ func registerStreamLogFlags(fs *pflag.FlagSet) {
// QueryLogRowThreshold only log queries returning or affecting this many rows
fs.Uint64Var(&queryLogRowThreshold, "querylog-row-threshold", queryLogRowThreshold, "Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.")

// QueryLogSampleRate causes a sample of queries to be logged
fs.Float64Var(&queryLogSampleRate, "querylog-sample-rate", queryLogSampleRate, "Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)")
}

const (
Expand Down Expand Up @@ -249,9 +257,22 @@ func GetFormatter[T any](logger *StreamLogger[T]) LogFormatter {
}
}

// shouldSampleQuery returns true if a query should be sampled based on queryLogSampleRate
func shouldSampleQuery() bool {
if queryLogSampleRate <= 0 {
return false
} else if queryLogSampleRate >= 1 {
return true
}
return rand.Float64() <= queryLogSampleRate
}

// ShouldEmitLog returns whether the log with the given SQL query
// should be emitted or filtered
func ShouldEmitLog(sql string, rowsAffected, rowsReturned uint64) bool {
if shouldSampleQuery() {
return true
}
if queryLogRowThreshold > max(rowsAffected, rowsReturned) && queryLogFilterTag == "" {
return false
}
Expand Down
131 changes: 131 additions & 0 deletions go/streamlog/streamlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/servenv"
)

Expand Down Expand Up @@ -260,3 +263,131 @@ func TestFile(t *testing.T) {
t.Errorf("streamlog file: want %q got %q", want, got)
}
}

func TestShouldSampleQuery(t *testing.T) {
queryLogSampleRate = -1
assert.False(t, shouldSampleQuery())

queryLogSampleRate = 0
assert.False(t, shouldSampleQuery())

// for test coverage, can't test a random result
queryLogSampleRate = 0.5
shouldSampleQuery()

queryLogSampleRate = 1.0
assert.True(t, shouldSampleQuery())

queryLogSampleRate = 100.0
assert.True(t, shouldSampleQuery())
}

func TestShouldEmitLog(t *testing.T) {
origQueryLogFilterTag := queryLogFilterTag
origQueryLogRowThreshold := queryLogRowThreshold
origQueryLogSampleRate := queryLogSampleRate
defer func() {
SetQueryLogFilterTag(origQueryLogFilterTag)
SetQueryLogRowThreshold(origQueryLogRowThreshold)
SetQueryLogSampleRate(origQueryLogSampleRate)
}()

tests := []struct {
sql string
qLogFilterTag string
qLogRowThreshold uint64
qLogSampleRate float64
rowsAffected uint64
rowsReturned uint64
ok bool
}{
{
sql: "queryLogThreshold smaller than affected and returned",
qLogFilterTag: "",
qLogRowThreshold: 2,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 7,
ok: true,
},
{
sql: "queryLogThreshold greater than affected and returned",
qLogFilterTag: "",
qLogRowThreshold: 27,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 17,
ok: false,
},
{
sql: "this doesn't contains queryFilterTag: TAG",
qLogFilterTag: "special tag",
qLogRowThreshold: 10,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 17,
ok: false,
},
{
sql: "this contains queryFilterTag: TAG",
qLogFilterTag: "TAG",
qLogRowThreshold: 0,
qLogSampleRate: 0.0,
rowsAffected: 7,
rowsReturned: 17,
ok: true,
},
{
sql: "this contains querySampleRate: 1.0",
qLogFilterTag: "",
qLogRowThreshold: 0,
qLogSampleRate: 1.0,
rowsAffected: 7,
rowsReturned: 17,
ok: true,
},
{
sql: "this contains querySampleRate: 1.0 without expected queryFilterTag",
qLogFilterTag: "TAG",
qLogRowThreshold: 0,
qLogSampleRate: 1.0,
rowsAffected: 7,
rowsReturned: 17,
ok: true,
},
}

for _, tt := range tests {
t.Run(tt.sql, func(t *testing.T) {
SetQueryLogFilterTag(tt.qLogFilterTag)
SetQueryLogRowThreshold(tt.qLogRowThreshold)
SetQueryLogSampleRate(tt.qLogSampleRate)

require.Equal(t, tt.ok, ShouldEmitLog(tt.sql, tt.rowsAffected, tt.rowsReturned))
})
}
}

func BenchmarkShouldEmitLog(b *testing.B) {
b.Run("default", func(b *testing.B) {
SetQueryLogSampleRate(0.0)
for i := 0; i < b.N; i++ {
ShouldEmitLog("select * from test where user='someone'", 0, 123)
}
})
b.Run("filter_tag", func(b *testing.B) {
SetQueryLogSampleRate(0.0)
SetQueryLogFilterTag("LOG_QUERY")
defer SetQueryLogFilterTag("")
for i := 0; i < b.N; i++ {
ShouldEmitLog("select /* LOG_QUERY=1 */ * from test where user='someone'", 0, 123)
}
})
b.Run("50%_sample_rate", func(b *testing.B) {
SetQueryLogSampleRate(0.5)
defer SetQueryLogSampleRate(0.0)
for i := 0; i < b.N; i++ {
ShouldEmitLog("select * from test where user='someone'", 0, 123)
}
})
}
16 changes: 12 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -82,6 +83,9 @@ var (
// tabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets.
tabletFilters []string

// tabletFilterTags are the tablet tag filters (as key:value pairs) to apply to the full set of tablets.
tabletFilterTags flagutil.StringMapValue

// refreshInterval is the interval at which healthcheck refreshes its list of tablets from topo.
refreshInterval = 1 * time.Minute

Expand Down Expand Up @@ -164,6 +168,7 @@ func init() {

func registerDiscoveryFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&tabletFilters, "tablet_filters", []string{}, "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch.")
fs.Var(&tabletFilterTags, "tablet-filter-tags", "Specifies a comma-separated list of tablet tags (as key:value pairs) to filter the tablets to watch.")
fs.Var((*topoproto.TabletTypeListFlag)(&AllowedTabletTypes), "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to. Should be provided as a comma-separated set of tablet types.")
fs.StringSliceVar(&KeyspacesToWatch, "keyspaces_to_watch", []string{}, "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema.")
}
Expand Down Expand Up @@ -337,13 +342,13 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
loadTabletsTrigger: make(chan struct{}),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
}

for _, c := range cells {
var filters TabletFilters
log.Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
Expand All @@ -357,11 +362,14 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
42 changes: 42 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ type TabletFilter interface {
IsIncluded(tablet *topodata.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
}
}
return true
}

// FilterByShard is a filter that filters tablets by
// keyspace/shard.
type FilterByShard struct {
Expand Down Expand Up @@ -375,3 +388,32 @@ func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}

// FilterByTabletTags is a filter that filters tablets by tablet tag key/values.
type FilterByTabletTags struct {
tags map[string]string
}

// NewFilterByTabletTags creates a new FilterByTabletTags. All tablets that match
// all tablet tags will be forwarded to the TopologyWatcher's consumer.
func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
return &FilterByTabletTags{
tags: tabletTags,
}
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
if fbtg.tags == nil {
return true
}
if tablet.Tags == nil {
return false
}
for key, val := range fbtg.tags {
if tabletVal, found := tablet.Tags[key]; !found || tabletVal != val {
return false
}
}
return true
}
Loading

0 comments on commit a0a9e11

Please sign in to comment.