Skip to content

Commit

Permalink
Merge #86448 #86627
Browse files Browse the repository at this point in the history
86448: changefeedccl: Add options to enable the use of MuxRangeFeed RPC r=miretskiy a=miretskiy

Enable changefeed to use `MuxRangeFeed` RPC if
`changefeed.mux_rangefeed.enabled` is set to true.

Release note (enterprise change): Changefeeds may opt in
via `changefeed.mux_rangefeed.enabled` setting to use
`MuxRangeFeed` RPC which multiplexes multipe range feed streams
onto a single RPC stream per node.

Release justification: add option to enable functionality
that's disabled by default.

86627: sql: add COPY to sampled_query logging r=cucaroach a=otan

As requested by product, we want COPY to show up on sampled_query. This
commit adds this, whilst fudging a few things presumably more relevant
to "regular" DDL statements.

Release justification: telemetry only change

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
3 people committed Aug 23, 2022
3 parents 9b52585 + 81ca887 + 5f24073 commit 681f951
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
UseMux: changefeedbase.UseMuxRangeFeed.Get(&cfg.Settings.SV),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/flowinfra",
"//pkg/util",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -214,3 +215,11 @@ var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting(
"if set, rather than only protecting changefeed targets from garbage collection during backfills, data will always be protected up to the changefeed's frontier",
true,
)

// UseMuxRangeFeed enables the use of MuxRangeFeed RPC.
var UseMuxRangeFeed = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.mux_rangefeed.enabled",
"if true, changefeed uses multiplexing rangefeed RPC",
util.ConstantWithMetamorphicTestBool("changefeed.mux_rangefeed.enabled", false),
)
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Config struct {

// Knobs are kvfeed testing knobs.
Knobs TestingKnobs

// UseMux enables MuxRangeFeed rpc
UseMux bool
}

// Run will run the kvfeed. The feed runs synchronously and returns an
Expand Down Expand Up @@ -106,7 +109,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.InitialHighWater, cfg.EndTime,
cfg.Codec,
cfg.SchemaFeed,
sc, pff, bf, cfg.Knobs)
sc, pff, bf, cfg.UseMux, cfg.Knobs)
f.onBackfillCallback = cfg.OnBackfillCallback

g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -176,6 +179,8 @@ type kvFeed struct {
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy

useMux bool

// These dependencies are made available for test injection.
bufferFactory func() kvevent.Buffer
tableFeed schemafeed.SchemaFeed
Expand All @@ -200,6 +205,7 @@ func newKVFeed(
sc kvScanner,
pff physicalFeedFactory,
bf func() kvevent.Buffer,
useMux bool,
knobs TestingKnobs,
) *kvFeed {
return &kvFeed{
Expand All @@ -218,6 +224,7 @@ func newKVFeed(
scanner: sc,
physicalFeed: pff,
bufferFactory: bf,
useMux: useMux,
knobs: knobs,
}
}
Expand Down Expand Up @@ -463,6 +470,7 @@ func (f *kvFeed) runUntilTableEvent(
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
Knobs: f.knobs,
UseMux: f.useMux,
}

g.GoCtx(func(ctx context.Context) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -125,7 +126,9 @@ func TestKVFeed(t *testing.T) {
tc.needsInitialScan, tc.withDiff,
tc.initialHighWater, tc.endTime,
keys.SystemSQLCodec,
tf, sf, rangefeedFactory(ref.run), bufferFactory, TestingKnobs{})
tf, sf, rangefeedFactory(ref.run), bufferFactory,
util.ConstantWithMetamorphicTestBool("use_mux", true),
TestingKnobs{})
ctx, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type rangeFeedConfig struct {
Spans []kvcoord.SpanTimePair
WithDiff bool
Knobs TestingKnobs
UseMux bool
}

type rangefeedFactory func(
Expand Down Expand Up @@ -74,8 +75,13 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
}
g := ctxgroup.WithContext(ctx)
g.GoCtx(feed.addEventsToBuffer)
var rfOpts []kvcoord.RangeFeedOption
if cfg.UseMux {
rfOpts = append(rfOpts, kvcoord.WithMuxRangeFeed())
}

g.GoCtx(func(ctx context.Context) error {
return p(ctx, cfg.Spans, cfg.WithDiff, feed.eventC)
return p(ctx, cfg.Spans, cfg.WithDiff, feed.eventC, rfOpts...)
})
return g.Wait()
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,7 +2443,26 @@ func (ex *connExecutor) execCopyIn(
payload := eventNonRetriableErrPayload{err: err}
return ev, payload, nil
}
defer cm.Close(ctx)
defer func() {
cm.Close(ctx)

// These fields are not available in COPY, so use the empty value.
var stmtFingerprintID roachpb.StmtFingerprintID
var stats topLevelQueryStats
ex.planner.maybeLogStatement(
ctx,
ex.executorType,
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
cm.numInsertedRows(),
retErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
stmtFingerprintID,
&stats,
)
}()

if err := ex.execWithProfiling(ctx, cmd.Stmt, nil, func(ctx context.Context) error {
return cm.run(ctx)
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var copyBatchRowSize = util.ConstantWithMetamorphicTestRange("copy-batch-size",

type copyMachineInterface interface {
run(ctx context.Context) error
numInsertedRows() int

// Close closes memory accounts associated with copy.
Close(ctx context.Context)
Expand Down Expand Up @@ -266,6 +267,10 @@ func newCopyMachine(
return c, nil
}

func (c *copyMachine) numInsertedRows() int {
return c.insertedRows
}

func (c *copyMachine) initMonitoring(ctx context.Context, parentMon *mon.BytesMonitor) {
// Create a monitor for the COPY command so it can be tracked separate from transaction or session.
memMetrics := &MemoryMetrics{}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func CopyInFileStmt(destination, schema, table string) string {
)
}

func (f *fileUploadMachine) numInsertedRows() int {
return f.c.numInsertedRows()
}

func (f *fileUploadMachine) Close(ctx context.Context) {
f.c.Close(ctx)
}
Expand Down

0 comments on commit 681f951

Please sign in to comment.