Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: Changefeed performance improvements #91002

Merged
merged 13 commits into from
Nov 4, 2022
Merged
4 changes: 4 additions & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
Expand Down
4 changes: 4 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
<tr><td><code>bulkio.backup.read_timeout</code></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td></tr>
<tr><td><code>bulkio.backup.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.balance_range_distribution.enable</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value</td></tr>
<tr><td><code>changefeed.fast_gzip.enabled</code></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><code>changefeed.schema_feed.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
<tr><td><code>cloudstorage.azure.concurrent_upload_buffers</code></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td></tr>
Expand Down
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ ALL_TESTS = [
"//pkg/util/span:span_test",
"//pkg/util/stop:stop_test",
"//pkg/util/stringarena:stringarena_test",
"//pkg/util/strutil:strutil_test",
"//pkg/util/syncutil/singleflight:singleflight_test",
"//pkg/util/syncutil:syncutil_test",
"//pkg/util/sysutil:sysutil_test",
Expand Down Expand Up @@ -2016,6 +2017,8 @@ GO_TARGETS = [
"//pkg/util/stringarena:stringarena",
"//pkg/util/stringarena:stringarena_test",
"//pkg/util/stringencoding:stringencoding",
"//pkg/util/strutil:strutil",
"//pkg/util/strutil:strutil_test",
"//pkg/util/syncutil/singleflight:singleflight",
"//pkg/util/syncutil/singleflight:singleflight_test",
"//pkg/util/syncutil:syncutil",
Expand Down Expand Up @@ -2969,6 +2972,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/stop:get_x_data",
"//pkg/util/stringarena:get_x_data",
"//pkg/util/stringencoding:get_x_data",
"//pkg/util/strutil:get_x_data",
"//pkg/util/syncutil:get_x_data",
"//pkg/util/syncutil/singleflight:get_x_data",
"//pkg/util/system:get_x_data",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"changefeed_dist.go",
"changefeed_processors.go",
"changefeed_stmt.go",
"compression.go",
"doc.go",
"encoder.go",
"encoder_avro.go",
Expand Down Expand Up @@ -58,6 +59,7 @@ go_library(
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/protectedts",
Expand Down Expand Up @@ -96,6 +98,7 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
Expand Down Expand Up @@ -227,6 +230,7 @@ go_test(
"//pkg/sql/parser",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
},
func(d tree.Datum, memo interface{}) (interface{}, error) {
datumArr := d.(*tree.DArray)

var avroArr []interface{}
if memo != nil {
avroArr = memo.([]interface{})
Expand All @@ -616,14 +617,13 @@ func typeToAvroSchema(typ *types.T) (*avroSchemaField, error) {
} else {
avroArr = make([]interface{}, 0, datumArr.Len())
}

for i, elt := range datumArr.Array {
var encoded interface{}
if elt == tree.DNull {
encoded = nil
} else {
var encErr error
if i < len(avroArr) {
if i < len(avroArr) && avroArr[i] != nil {
encoded, encErr = itemSchema.encodeDatum(elt, avroArr[i].(map[string]interface{})[itemUnionKey])
} else {
encoded, encErr = itemSchema.encodeDatum(elt, nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func createBenchmarkChangefeed(
}
initialHighWater := hlc.Timestamp{}
encodingOpts := changefeedbase.EncodingOptions{Format: changefeedbase.OptFormatJSON, Envelope: changefeedbase.OptEnvelopeRow}
encoder, err := makeJSONEncoder(encodingOpts, AllTargets(details))
encoder, err := makeJSONEncoder(encodingOpts)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"event.go",
"projection.go",
"rowfetcher_cache.go",
"version_cache.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent",
visibility = ["//visibility:public"],
Expand Down
59 changes: 57 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
Expand Down Expand Up @@ -345,7 +346,7 @@ func getEventDescriptorCached(
schemaTS hlc.Timestamp,
cache *cache.UnorderedCache,
) (*EventDescriptor, error) {
idVer := idVersion{id: desc.GetID(), version: desc.GetVersion(), family: family.ID}
idVer := CacheKey{ID: desc.GetID(), Version: desc.GetVersion(), FamilyID: family.ID}

if v, ok := cache.Get(idVer); ok {
ed := v.(*EventDescriptor)
Expand Down Expand Up @@ -381,7 +382,7 @@ func NewEventDecoder(
return nil, err
}

eventDescriptorCache := cache.NewUnorderedCache(defaultCacheConfig)
eventDescriptorCache := cache.NewUnorderedCache(DefaultCacheConfig)
getEventDescriptor := func(
desc catalog.TableDescriptor,
family *descpb.ColumnFamilyDescriptor,
Expand Down Expand Up @@ -549,3 +550,57 @@ func TestingGetFamilyIDFromKey(
_, familyID, err := decoder.(*eventDecoder).rfCache.tableDescForKey(context.Background(), key, ts)
return familyID, err
}

// TestingMakeEventRowFromEncDatums creates event row from specified enc datum row.
// encRow assumed to contain *already decoded* datums.
// The first numKeyCols are assumed to be primary key columns.
// Columns names are generated based on the datum types.
func TestingMakeEventRowFromEncDatums(
encRow rowenc.EncDatumRow, colTypes []*types.T, numKeyCols int, deleted bool,
) Row {
if len(encRow) != len(colTypes) {
panic("unexpected length mismatch")
}
intRange := func(start, end int) (res []int) {
for i := 0; i < end; i++ {
res = append(res, i)
}
return res
}
ed := &EventDescriptor{
Metadata: Metadata{
TableID: 42,
TableName: "randtbl",
FamilyName: "primary",
},
cols: func() (cols []ResultColumn) {
names := make(map[string]int, len(encRow))
for i, typ := range colTypes {
colName := fmt.Sprintf("col_%s", typ.String())
names[colName]++
if names[colName] > 1 {
colName += fmt.Sprintf("_%d", names[colName]-1)
}
cols = append(cols, ResultColumn{
ResultColumn: colinfo.ResultColumn{
Name: colName,
Typ: typ,
TableID: 42,
},
ord: i,
})
}
return cols
}(),
keyCols: intRange(0, numKeyCols),
valueCols: intRange(0, len(encRow)),
}

var alloc tree.DatumAlloc
return Row{
EventDescriptor: ed,
datums: encRow,
deleted: deleted,
alloc: &alloc,
}
}
18 changes: 2 additions & 16 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,6 @@ type watchedFamily struct {
familyName string
}

var defaultCacheConfig = cache.Config{
Policy: cache.CacheFIFO,
// TODO: If we find ourselves thrashing here in changefeeds on many tables,
// we can improve performance by eagerly evicting versions using Resolved notifications.
// A old version with a timestamp entirely before a notification can be safely evicted.
ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 },
}

type idVersion struct {
id descpb.ID
version descpb.DescriptorVersion
family descpb.FamilyID
}

// newRowFetcherCache constructs row fetcher cache.
func newRowFetcherCache(
ctx context.Context,
Expand All @@ -96,7 +82,7 @@ func newRowFetcherCache(
leaseMgr: leaseMgr,
collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */),
db: db,
fetchers: cache.NewUnorderedCache(defaultCacheConfig),
fetchers: cache.NewUnorderedCache(DefaultCacheConfig),
watchedFamilies: watchedFamilies,
}, err
}
Expand Down Expand Up @@ -192,7 +178,7 @@ var ErrUnwatchedFamily = errors.New("watched table but unwatched family")
func (c *rowFetcherCache) RowFetcherForColumnFamily(
tableDesc catalog.TableDescriptor, family descpb.FamilyID,
) (*row.Fetcher, *descpb.ColumnFamilyDescriptor, error) {
idVer := idVersion{id: tableDesc.GetID(), version: tableDesc.GetVersion(), family: family}
idVer := CacheKey{ID: tableDesc.GetID(), Version: tableDesc.GetVersion(), FamilyID: family}
if v, ok := c.fetchers.Get(idVer); ok {
f := v.(*cachedFetcher)
if f.skip {
Expand Down
42 changes: 42 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/version_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdcevent

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
)

// DefaultCacheConfig is the default configuration for unordered cache.
var DefaultCacheConfig = cache.Config{
Policy: cache.CacheFIFO,
// TODO: If we find ourselves thrashing here in changefeeds on many tables,
// we can improve performance by eagerly evicting versions using Resolved notifications.
// A old Version with a timestamp entirely before a notification can be safely evicted.
ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 },
}

// CacheKey is the key for the event caches.
type CacheKey struct {
ID descpb.ID
Version descpb.DescriptorVersion
FamilyID descpb.FamilyID
}

// GetCachedOrCreate returns cached object, or creates and caches new one.
func GetCachedOrCreate(
k CacheKey, c *cache.UnorderedCache, creator func() interface{},
) interface{} {
if v, ok := c.Get(k); ok {
return v
}
v := creator()
c.Add(k, v)
return v
}
Loading