Skip to content

Commit

Permalink
changefeedccl: Expand the set of available compression algorithms
Browse files Browse the repository at this point in the history
Expand the set of supported compression algorithms in changefeed.

A faster implementation of gzip algorithm is avaible, and is used
by default.  The gzip algorithm implementation can be reverted
to Go standard gzip implementation via the
`changefeed.fast_gzip.enabled` setting.

In addition, add support for compression files with zstd.

Release notes (enterprise change): Changefeed can emit files compressed
with zstd algorithm -- which provides good compression, and is much
faster than gzip.  In addition, a new, faster implementation of
gzip is used by default.
  • Loading branch information
Yevgeniy Miretskiy committed Sep 27, 2022
1 parent c55586b commit d082e7f
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bulkio.backup.read_with_priority_after duration 1m0s amount of time since the re
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 8 the number of workers to use when processing events; 0 or 1 disables
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>8</code></td><td>the number of workers to use when processing events; 0 or 1 disables</td></tr>
<tr><td><code>changefeed.fast_gzip.enabled</code></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
<tr><td><code>changefeed.schema_feed.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td></tr>
<tr><td><code>cloudstorage.http.custom_ca</code></td><td>string</td><td><code></code></td><td>custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"changefeed_dist.go",
"changefeed_processors.go",
"changefeed_stmt.go",
"compression.go",
"doc.go",
"encoder.go",
"encoder_avro.go",
Expand Down Expand Up @@ -96,6 +97,7 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptUpdatedTimestamps: flagOption,
OptMVCCTimestamps: flagOption,
OptDiff: flagOption,
OptCompression: enum("gzip"),
OptCompression: enum("gzip", "zstd"),
OptSchemaChangeEvents: enum("column_changes", "default"),
OptSchemaChangePolicy: enum("backfill", "nobackfill", "stop", "ignore"),
OptSplitColumnFamilies: flagOption,
Expand Down
70 changes: 70 additions & 0 deletions pkg/ccl/changefeedccl/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
stdgzip "compress/gzip"
"io"
"strings"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
"github.com/klauspost/compress/zstd"
"github.com/klauspost/pgzip"
)

var useFastGzip = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.fast_gzip.enabled",
"use fast gzip implementation",
util.ConstantWithMetamorphicTestBool(
"changefeed.fast_gzip.enabled", true,
),
).WithPublic()

type compressionAlgo string

const sinkCompressionGzip compressionAlgo = "gzip"
const sinkCompressionZstd compressionAlgo = "zstd"

func (a compressionAlgo) enabled() bool {
return a != ""
}

// newCompressionCodec returns compression codec for the specified algorithm,
// which writes compressed data to the destination.
// TODO(yevgeniy): Support compression configuration (level, speed, etc).
// TODO(yevgeniy): Add telemetry.
func newCompressionCodec(
algo compressionAlgo, sv *settings.Values, dest io.Writer,
) (io.WriteCloser, error) {
switch algo {
case sinkCompressionGzip:
if useFastGzip.Get(sv) {
return pgzip.NewWriterLevel(dest, pgzip.DefaultCompression)
}
return stdgzip.NewWriterLevel(dest, stdgzip.DefaultCompression)
case sinkCompressionZstd:
return zstd.NewWriter(dest, zstd.WithEncoderLevel(zstd.SpeedFastest))
default:
return nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo)
}
}

// compressionFromString returns compression algorithm type along with file extension.
func compressionFromString(algo string) (_ compressionAlgo, ext string, _ error) {
if strings.EqualFold(algo, string(sinkCompressionGzip)) {
return sinkCompressionGzip, ".gz", nil
}
if strings.EqualFold(algo, string(sinkCompressionZstd)) {
return sinkCompressionZstd, ".zst", nil
}
return "", "", errors.AssertionFailedf("unsupported compression algorithm %q", algo)
}
39 changes: 20 additions & 19 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"crypto/rand"
"encoding/hex"
Expand Down Expand Up @@ -38,9 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/google/btree"
// Placeholder for pgzip and zdstd.
_ "github.com/klauspost/compress/zstd"
_ "github.com/klauspost/pgzip"
)

func isCloudStorageSink(u *url.URL) bool {
Expand Down Expand Up @@ -291,7 +287,7 @@ type cloudStorageSink struct {
ext string
rowDelimiter []byte

compression string
compression compressionAlgo

es cloud.ExternalStorage

Expand All @@ -316,8 +312,6 @@ type cloudStorageSink struct {
flushErr atomic.Value
}

const sinkCompressionGzip = "gzip"

var cloudStorageSinkIDAtomic int64

// Files that are emitted can be partitioned by their earliest event time,
Expand Down Expand Up @@ -423,12 +417,12 @@ func makeCloudStorageSink(
}

if codec := encodingOpts.Compression; codec != "" {
if strings.EqualFold(codec, "gzip") {
s.compression = sinkCompressionGzip
s.ext = s.ext + ".gz"
} else {
return nil, errors.Errorf(`unsupported compression codec %q`, codec)
algo, ext, err := compressionFromString(codec)
if err != nil {
return nil, err
}
s.compression = algo
s.ext = s.ext + ext
}

// We make the external storage with a nil IOAccountingInterceptor since we
Expand All @@ -446,27 +440,31 @@ func makeCloudStorageSink(

func (s *cloudStorageSink) getOrCreateFile(
topic TopicDescriptor, eventMVCC hlc.Timestamp,
) *cloudStorageSinkFile {
) (*cloudStorageSinkFile, error) {
name, _ := s.topicNamer.Name(topic)
key := cloudStorageSinkKey{name, int64(topic.GetVersion())}
if item := s.files.Get(key); item != nil {
f := item.(*cloudStorageSinkFile)
if eventMVCC.Less(f.oldestMVCC) {
f.oldestMVCC = eventMVCC
}
return f
return f, nil
}
f := &cloudStorageSinkFile{
created: timeutil.Now(),
cloudStorageSinkKey: key,
oldestMVCC: eventMVCC,
}
switch s.compression {
case sinkCompressionGzip:
f.codec = gzip.NewWriter(&f.buf)

if s.compression.enabled() {
codec, err := newCompressionCodec(s.compression, &s.settings.SV, &f.buf)
if err != nil {
return nil, err
}
f.codec = codec
}
s.files.ReplaceOrInsert(f)
return f
return f, nil
}

// EmitRow implements the Sink interface.
Expand All @@ -482,7 +480,10 @@ func (s *cloudStorageSink) EmitRow(
}

s.metrics.recordMessageSize(int64(len(key) + len(value)))
file := s.getOrCreateFile(topic, mvcc)
file, err := s.getOrCreateFile(topic, mvcc)
if err != nil {
return err
}
file.alloc.Merge(&alloc)

if _, err := file.Write(value); err != nil {
Expand Down

0 comments on commit d082e7f

Please sign in to comment.