Skip to content

Commit

Permalink
changefeedccl: Improve JSON encoder performance
Browse files Browse the repository at this point in the history
Rewrite JSON encoder to improve its performance.

Prior to this change JSON encoder was very inefficient.
This inefficiency had multiple underlying reasons:
  * New Go map objects were constructed for each event.
  * Underlying json conversion functions had inefficiencies
    (tracked in cockroachdb#87968)
  * Conversion of Go maps to JSON incurs the cost
    of sorting the keys -- for each row. Sorting,
    particularly when rows are wide, has significant cost.
  * Each conversion to JSON allocated new array builder
    (to encode keys) and new object builder; that too has cost.
  * Underlying code structure, while attempting to reuse
    code when constructing different "envelope" formats,
    cause the code to be more inefficient.

This PR addresses all of the above.  In particular, since
a schema version for the table is guaranteeed to have
the same set of primary key and value columns, we can construct
JSON builders once.  The expensive sort operation can be performed
once per version; builders can be memoized and cached.

The performance impact is significant:
  * Key encoding speed up is 5-30%, depending on the number of primary
    keys.
  * Value encoding 30% - 60% faster (slowest being "wrapped" envelope
    with diff -- which effectively encodes 2x values)
  * Byte allocations per row reduces by over 70%, with the number
    of allocations reduced similarly.

Release note (enterprise change): Changefeed JSON encoder
performance improved by 50%.
Release justification: performance improvement
  • Loading branch information
Yevgeniy Miretskiy committed Sep 28, 2022
1 parent d170075 commit e6dbfbc
Show file tree
Hide file tree
Showing 17 changed files with 469 additions and 149 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func createBenchmarkChangefeed(
}
initialHighWater := hlc.Timestamp{}
encodingOpts := changefeedbase.EncodingOptions{Format: changefeedbase.OptFormatJSON, Envelope: changefeedbase.OptEnvelopeRow}
encoder, err := makeJSONEncoder(encodingOpts, AllTargets(details))
encoder, err := makeJSONEncoder(encodingOpts)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"event.go",
"projection.go",
"rowfetcher_cache.go",
"version_cache.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent",
visibility = ["//visibility:public"],
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func getEventDescriptorCached(
schemaTS hlc.Timestamp,
cache *cache.UnorderedCache,
) (*EventDescriptor, error) {
idVer := idVersion{id: desc.GetID(), version: desc.GetVersion(), family: family.ID}
idVer := CacheKey{ID: desc.GetID(), Version: desc.GetVersion(), FamilyID: family.ID}

if v, ok := cache.Get(idVer); ok {
ed := v.(*EventDescriptor)
Expand Down Expand Up @@ -382,7 +382,7 @@ func NewEventDecoder(
return nil, err
}

eventDescriptorCache := cache.NewUnorderedCache(defaultCacheConfig)
eventDescriptorCache := cache.NewUnorderedCache(DefaultCacheConfig)
getEventDescriptor := func(
desc catalog.TableDescriptor,
family *descpb.ColumnFamilyDescriptor,
Expand Down
18 changes: 2 additions & 16 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,6 @@ type watchedFamily struct {
familyName string
}

var defaultCacheConfig = cache.Config{
Policy: cache.CacheFIFO,
// TODO: If we find ourselves thrashing here in changefeeds on many tables,
// we can improve performance by eagerly evicting versions using Resolved notifications.
// A old version with a timestamp entirely before a notification can be safely evicted.
ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 },
}

type idVersion struct {
id descpb.ID
version descpb.DescriptorVersion
family descpb.FamilyID
}

// newRowFetcherCache constructs row fetcher cache.
func newRowFetcherCache(
ctx context.Context,
Expand All @@ -96,7 +82,7 @@ func newRowFetcherCache(
leaseMgr: leaseMgr,
collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */),
db: db,
fetchers: cache.NewUnorderedCache(defaultCacheConfig),
fetchers: cache.NewUnorderedCache(DefaultCacheConfig),
watchedFamilies: watchedFamilies,
}, err
}
Expand Down Expand Up @@ -192,7 +178,7 @@ var ErrUnwatchedFamily = errors.New("watched table but unwatched family")
func (c *rowFetcherCache) RowFetcherForColumnFamily(
tableDesc catalog.TableDescriptor, family descpb.FamilyID,
) (*row.Fetcher, *descpb.ColumnFamilyDescriptor, error) {
idVer := idVersion{id: tableDesc.GetID(), version: tableDesc.GetVersion(), family: family}
idVer := CacheKey{ID: tableDesc.GetID(), Version: tableDesc.GetVersion(), FamilyID: family}
if v, ok := c.fetchers.Get(idVer); ok {
f := v.(*cachedFetcher)
if f.skip {
Expand Down
42 changes: 42 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/version_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdcevent

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
)

// DefaultCacheConfig is the default configuration for unordered cache.
var DefaultCacheConfig = cache.Config{
Policy: cache.CacheFIFO,
// TODO: If we find ourselves thrashing here in changefeeds on many tables,
// we can improve performance by eagerly evicting versions using Resolved notifications.
// A old Version with a timestamp entirely before a notification can be safely evicted.
ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 },
}

// CacheKey is the key for the event caches.
type CacheKey struct {
ID descpb.ID
Version descpb.DescriptorVersion
FamilyID descpb.FamilyID
}

// GetCachedOrCreate returns cached object, or creates and caches new one.
func GetCachedOrCreate(
k CacheKey, c *cache.UnorderedCache, creator func() interface{},
) interface{} {
if v, ok := c.Get(k); ok {
return v
}
v := creator()
c.Add(k, v)
return v
}
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ func createChangefeedJobRecord(
// the default error to avoid claiming the user set an option they didn't
// explicitly set. Fortunately we know the only way to cause this is to
// set envelope.
if isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink) {
if (isCloudStorageSink(parsedSink) || isWebhookSink(parsedSink)) &&
encodingOpts.Envelope != changefeedbase.OptEnvelopeBare {
if err = opts.ForceKeyInValue(); err != nil {
return nil, errors.Errorf(`this sink is incompatible with envelope=%s`, encodingOpts.Envelope)
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func TestChangefeedProjectionDelete(t *testing.T) {
`foo: [0]->{}`,
})
}
cdcTest(t, testFn)
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

// If we drop columns which are not targeted by the changefeed, it should not backfill.
Expand Down Expand Up @@ -2473,9 +2473,7 @@ func TestChangefeedBareJSON(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO foo values (0, 'dog')`)
foo := feed(t, f, `CREATE CHANGEFEED WITH schema_change_policy=stop AS SELECT * FROM foo`)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{
`foo: [0]->{"a": 0, "b": "dog"}`,
})
assertPayloads(t, foo, []string{`foo: [0]->{"a": 0, "b": "dog"}`})
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
cdcTest(t, testFn, feedTestForceSink("enterprise"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getEncoder(
) (Encoder, error) {
switch opts.Format {
case changefeedbase.OptFormatJSON:
return makeJSONEncoder(opts, targets)
return makeJSONEncoder(opts)
case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro:
return newConfluentAvroEncoder(opts, targets)
case changefeedbase.OptFormatCSV:
Expand Down
Loading

0 comments on commit e6dbfbc

Please sign in to comment.