diff --git a/.bazelrc b/.bazelrc index 57ec9306980b..7731062479a6 100644 --- a/.bazelrc +++ b/.bazelrc @@ -34,6 +34,8 @@ info --ui_event_filters=-WARNING build:race --@io_bazel_rules_go//go/config:race "--test_env=GORACE=halt_on_error=1 log_path=stdout" --test_sharding_strategy=disabled test:test --test_env=TZ= +# Note: these timeout values are used indirectly in `build/teamcity/cockroach/ci/tests/testrace_impl.sh`. +# If those values are updated, the script should be updated accordingly. test:race --test_timeout=1200,6000,18000,72000 # CI should always run with `--config=ci` or `--config=cinolint`. diff --git a/build/teamcity/cockroach/ci/tests/testrace_impl.sh b/build/teamcity/cockroach/ci/tests/testrace_impl.sh index 93b2475a7038..d34e54871e59 100755 --- a/build/teamcity/cockroach/ci/tests/testrace_impl.sh +++ b/build/teamcity/cockroach/ci/tests/testrace_impl.sh @@ -6,6 +6,7 @@ set -xeuo pipefail # packages are expected to be formatted as go-style, e.g. ./pkg/cmd/bazci. bazel build //pkg/cmd/bazci --config=ci +size_to_timeout=("small:1200" "medium:6000" "large:18000" "enormous:72000") for pkg in "$@" do # Query to list all affected tests. @@ -14,19 +15,26 @@ do then pkg="$pkg:all" fi - tests=$(bazel query "kind(go_test, $pkg)" --output=label) - # Run affected tests. - for test in $tests + for kv in "${size_to_timeout[@]}"; do - if [[ ! -z $(bazel query "attr(tags, \"broken_in_bazel\", $test)") ]] - then - echo "Skipping test $test as it is broken in bazel" - continue - fi - $(bazel info bazel-bin --config=ci)/pkg/cmd/bazci/bazci_/bazci -- test --config=ci --config=race "$test" \ - --test_env=COCKROACH_LOGIC_TESTS_SKIP=true \ - --test_env=GOMAXPROCS=8 + size="${kv%%:*}" + timeout="${kv#*:}" + go_timeout=$(($timeout - 5)) + tests=$(bazel query "attr(size, $size, kind("go_test", tests($pkg)))" --output=label) + # Run affected tests. + for test in $tests + do + if [[ ! -z $(bazel query "attr(tags, \"broken_in_bazel\", $test)") ]] + then + echo "Skipping test $test as it is broken in bazel" + continue + fi + $(bazel info bazel-bin --config=ci)/pkg/cmd/bazci/bazci_/bazci -- test --config=ci --config=race "$test" \ + --test_env=COCKROACH_LOGIC_TESTS_SKIP=true \ + --test_env=GOMAXPROCS=8 \ + --test_arg=-test.timeout="${go_timeout}s" + done done done diff --git a/pkg/cli/start.go b/pkg/cli/start.go index 0e5ac747591e..e1853944f820 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -953,7 +953,7 @@ func reportServerInfo( buf.Printf("CockroachDB %s starting at %s (took %0.1fs)\n", srvS, timeutil.Now(), timeutil.Since(startTime).Seconds()) buf.Printf("build:\t%s %s @ %s (%s)\n", redact.Safe(info.Distribution), redact.Safe(info.Tag), redact.Safe(info.Time), redact.Safe(info.GoVersion)) - buf.Printf("webui:\t%s\n", serverCfg.AdminURL()) + buf.Printf("webui:\t%s\n", log.SafeManaged(serverCfg.AdminURL())) // (Re-)compute the client connection URL. We cannot do this // earlier (e.g. above, in the runStart function) because @@ -964,53 +964,53 @@ func reportServerInfo( log.Ops.Errorf(ctx, "failed computing the URL: %v", err) return err } - buf.Printf("sql:\t%s\n", pgURL.ToPQ()) - buf.Printf("sql (JDBC):\t%s\n", pgURL.ToJDBC()) + buf.Printf("sql:\t%s\n", log.SafeManaged(pgURL.ToPQ())) + buf.Printf("sql (JDBC):\t%s\n", log.SafeManaged(pgURL.ToJDBC())) - buf.Printf("RPC client flags:\t%s\n", clientFlagsRPC()) + buf.Printf("RPC client flags:\t%s\n", log.SafeManaged(clientFlagsRPC())) if len(serverCfg.SocketFile) != 0 { - buf.Printf("socket:\t%s\n", serverCfg.SocketFile) + buf.Printf("socket:\t%s\n", log.SafeManaged(serverCfg.SocketFile)) } logNum := 1 _ = cliCtx.logConfig.IterateDirectories(func(d string) error { if logNum == 1 { // Backward-compatibility. - buf.Printf("logs:\t%s\n", d) + buf.Printf("logs:\t%s\n", log.SafeManaged(d)) } else { - buf.Printf("logs[%d]:\t%s\n", logNum, d) + buf.Printf("logs[%d]:\t%s\n", log.SafeManaged(logNum), log.SafeManaged(d)) } logNum++ return nil }) if serverCfg.Attrs != "" { - buf.Printf("attrs:\t%s\n", serverCfg.Attrs) + buf.Printf("attrs:\t%s\n", log.SafeManaged(serverCfg.Attrs)) } if len(serverCfg.Locality.Tiers) > 0 { - buf.Printf("locality:\t%s\n", serverCfg.Locality) + buf.Printf("locality:\t%s\n", log.SafeManaged(serverCfg.Locality)) } if tmpDir := serverCfg.SQLConfig.TempStorageConfig.Path; tmpDir != "" { - buf.Printf("temp dir:\t%s\n", tmpDir) + buf.Printf("temp dir:\t%s\n", log.SafeManaged(tmpDir)) } if ext := st.ExternalIODir; ext != "" { - buf.Printf("external I/O path: \t%s\n", ext) + buf.Printf("external I/O path: \t%s\n", log.SafeManaged(ext)) } else { buf.Printf("external I/O path: \t\n") } for i, spec := range serverCfg.Stores.Specs { - buf.Printf("store[%d]:\t%s\n", i, spec) + buf.Printf("store[%d]:\t%s\n", i, log.SafeManaged(spec)) } buf.Printf("storage engine: \t%s\n", &serverCfg.StorageEngine) // Print the commong server identifiers. if baseCfg.ClusterName != "" { - buf.Printf("cluster name:\t%s\n", baseCfg.ClusterName) + buf.Printf("cluster name:\t%s\n", log.SafeManaged(baseCfg.ClusterName)) } clusterID := serverCfg.BaseConfig.ClusterIDContainer.Get() if tenantClusterID.Equal(uuid.Nil) { - buf.Printf("clusterID:\t%s\n", clusterID) + buf.Printf("clusterID:\t%s\n", log.SafeManaged(clusterID)) } else { - buf.Printf("storage clusterID:\t%s\n", clusterID) - buf.Printf("tenant clusterID:\t%s\n", tenantClusterID) + buf.Printf("storage clusterID:\t%s\n", log.SafeManaged(clusterID)) + buf.Printf("tenant clusterID:\t%s\n", log.SafeManaged(tenantClusterID)) } nodeID := serverCfg.BaseConfig.IDContainer.Get() if isHostNode { @@ -1035,7 +1035,7 @@ func reportServerInfo( buf.Printf("KV addresses:\t") comma := redact.SafeString("") for _, addr := range serverCfg.SQLConfig.TenantKVAddrs { - buf.Printf("%s%s", comma, addr) + buf.Printf("%s%s", comma, log.SafeManaged(addr)) comma = ", " } buf.Printf("\n") @@ -1118,7 +1118,7 @@ func reportConfiguration(ctx context.Context) { // running as root in a multi-user environment, or using different // uid/gid across runs in the same data directory. To determine // this, it's easier if the information appears in the log file. - log.Ops.Infof(ctx, "process identity: %s", sysutil.ProcessIdentity()) + log.Ops.Infof(ctx, "process identity: %s", log.SafeManaged(sysutil.ProcessIdentity())) } func maybeWarnMemorySizes(ctx context.Context) { @@ -1133,7 +1133,7 @@ func maybeWarnMemorySizes(ctx context.Context) { } else { fmt.Fprintf(&buf, " If you have a dedicated server a reasonable setting is 25%% of physical memory.") } - log.Ops.Warningf(ctx, "%s", buf.String()) + log.Ops.Warningf(ctx, "%s", redact.Safe(buf.String())) } // Check that the total suggested "max" memory is well below the available memory. @@ -1216,13 +1216,13 @@ func setupAndInitializeLoggingAndProfiling( "- Intruders with access to your machine or network can observe client-server traffic.\n"+ "- Intruders can log in without password and read or write any data in the cluster.\n"+ "- Intruders can consume all your server's resources and cause unavailability.", - addr) + log.SafeManaged(addr)) log.Ops.Shoutf(ctx, severity.INFO, "To start a secure server without mandating TLS for clients,\n"+ "consider --accept-sql-without-tls instead. For other options, see:\n\n"+ "- %s\n"+ "- %s", - build.MakeIssueURL(53404), + redact.Safe(build.MakeIssueURL(53404)), redact.Safe(docs.URL("secure-a-cluster.html")), ) } @@ -1246,7 +1246,7 @@ func setupAndInitializeLoggingAndProfiling( // We log build information to stdout (for the short summary), but also // to stderr to coincide with the full logs. info := build.GetInfo() - log.Ops.Infof(ctx, "%s", info.Short()) + log.Ops.Infof(ctx, "%s", log.SafeManaged(info.Short())) initTraceDir(ctx, serverCfg.InflightTraceDirName) initCPUProfile(ctx, serverCfg.CPUProfileDirName, serverCfg.Settings) @@ -1267,8 +1267,10 @@ func setupAndInitializeLoggingAndProfiling( func initGEOS(ctx context.Context) { loc, err := geos.EnsureInit(geos.EnsureInitErrorDisplayPrivate, startCtx.geoLibsDir) if err != nil { - log.Ops.Warningf(ctx, "could not initialize GEOS - spatial functions may not be available: %v", err) + log.Ops.Warningf(ctx, + "could not initialize GEOS - spatial functions may not be available: %v", + log.SafeManaged(err)) } else { - log.Ops.Infof(ctx, "GEOS loaded from directory %s", loc) + log.Ops.Infof(ctx, "GEOS loaded from directory %s", log.SafeManaged(loc)) } } diff --git a/pkg/clusterversion/BUILD.bazel b/pkg/clusterversion/BUILD.bazel index 15663edff181..c3b11920778c 100644 --- a/pkg/clusterversion/BUILD.bazel +++ b/pkg/clusterversion/BUILD.bazel @@ -21,6 +21,7 @@ go_library( deps = [ "//pkg/roachpb", "//pkg/settings", + "//pkg/util/envutil", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 0482d97b874a..229068bf9faf 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -10,7 +10,10 @@ package clusterversion -import "github.com/cockroachdb/cockroach/pkg/roachpb" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/envutil" +) // Key is a unique identifier for a version of CockroachDB. type Key int @@ -472,13 +475,10 @@ var rawVersionsSingleton = keyedVersions{ } const ( - // unstableVersionsAbove is a cluster version Key above which any upgrades in - // this version are considered unstable development-only versions if it is not - // negative, and upgrading to them should permanently move a cluster to - // development versions. On master it should be the minted version of the last - // release, while on release branches it can be set to invalidVersionKey to - // disable marking any versions as development versions. - unstableVersionsAbove = V22_1 + // developmentBranch should be toggled to false on a release branch once the + // set of versions becomes append-only and associated upgrade implementations + // are frozen. It is always true on the main development branch. + developmentBranch = true // finalVersion should be set on a release branch to the minted final cluster // version key, e.g. to V22_2 on the release-22.2 branch once it is minted. @@ -486,14 +486,27 @@ const ( finalVersion = invalidVersionKey ) +// devVersionsAbove is the version key above which all versions are offset to be +// development version when developmentBranch is true. By default this is all +// versions, by setting this to -1, but an env var can override this, to leave +// the first version un-offset. Doing so means that that version, which is +// generally minBinaryVersion as well, is unchanged, and thus allows upgrading a +// stable release data-dir to a dev version if desired. +var devVersionsAbove Key = func() Key { + if envutil.EnvOrDefaultBool("COCKROACH_UPGRADE_TO_DEV_VERSION", false) { + return invalidVersionKey + 1 + } + return invalidVersionKey +}() + var versionsSingleton = func() keyedVersions { - if unstableVersionsAbove > invalidVersionKey { + if developmentBranch { const devOffset = 1000000 // Throw every version above the last release (which will be none on a release // branch) 1 million major versions into the future, so any "upgrade" to a // release branch build will be a downgrade and thus blocked. for i := range rawVersionsSingleton { - if rawVersionsSingleton[i].Key > unstableVersionsAbove { + if rawVersionsSingleton[i].Key > devVersionsAbove { rawVersionsSingleton[i].Major += devOffset } } diff --git a/pkg/cmd/roachtest/tests/versionupgrade.go b/pkg/cmd/roachtest/tests/versionupgrade.go index e59562a984ff..25549ad6cccf 100644 --- a/pkg/cmd/roachtest/tests/versionupgrade.go +++ b/pkg/cmd/roachtest/tests/versionupgrade.go @@ -431,6 +431,7 @@ func upgradeNodes( binary := uploadVersion(ctx, t, c, c.Node(node), newVersion) settings := install.MakeClusterSettings(install.BinaryOption(binary)) + settings.Env = append(settings.Env, "COCKROACH_UPGRADE_TO_DEV_VERSION=true") c.Start(ctx, t.L(), startOpts, settings, c.Node(node)) } } diff --git a/pkg/kv/kvserver/client_migration_test.go b/pkg/kv/kvserver/client_migration_test.go index 6efacf3226ad..4e41acb8ede4 100644 --- a/pkg/kv/kvserver/client_migration_test.go +++ b/pkg/kv/kvserver/client_migration_test.go @@ -240,8 +240,8 @@ func TestMigrateWaitsForApplication(t *testing.T) { blockApplicationCh := make(chan struct{}) // We're going to be migrating from startV to endV. - startV := roachpb.Version{Major: 41} - endV := roachpb.Version{Major: 42} + startV := roachpb.Version{Major: 1000041} + endV := roachpb.Version{Major: 1000042} ctx := context.Background() tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ diff --git a/pkg/server/config.go b/pkg/server/config.go index c23b4804badb..9f22c7b988f8 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -537,7 +537,7 @@ func (cfg *Config) Report(ctx context.Context) { } else { log.Infof(ctx, "system total memory: %s", humanizeutil.IBytes(memSize)) } - log.Infof(ctx, "server configuration:\n%s", cfg) + log.Infof(ctx, "server configuration:\n%s", log.SafeManaged(cfg)) } // Engines is a container of engines, allowing convenient closing. @@ -732,7 +732,7 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) { } log.Infof(ctx, "%d storage engine%s initialized", - len(engines), util.Pluralize(int64(len(engines)))) + len(engines), redact.Safe(util.Pluralize(int64(len(engines))))) for _, s := range details { log.Infof(ctx, "%v", s) } diff --git a/pkg/server/goroutinedumper/goroutinedumper.go b/pkg/server/goroutinedumper/goroutinedumper.go index 4a8bd446f040..0f97e6ce1382 100644 --- a/pkg/server/goroutinedumper/goroutinedumper.go +++ b/pkg/server/goroutinedumper/goroutinedumper.go @@ -119,7 +119,7 @@ func NewGoroutineDumper( return nil, errors.New("directory to store dumps could not be determined") } - log.Infof(ctx, "writing goroutine dumps to %s", dir) + log.Infof(ctx, "writing goroutine dumps to %s", log.SafeManaged(dir)) gd := &GoroutineDumper{ heuristics: []heuristic{ diff --git a/pkg/server/heapprofiler/activequeryprofiler.go b/pkg/server/heapprofiler/activequeryprofiler.go index 4896351cde93..ef4df38feaeb 100644 --- a/pkg/server/heapprofiler/activequeryprofiler.go +++ b/pkg/server/heapprofiler/activequeryprofiler.go @@ -76,13 +76,13 @@ func NewActiveQueryProfiler( maxMem, warn, err := memLimitFn() if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to detect cgroup memory limit") } if warn != "" { - log.Warningf(ctx, "warning when reading cgroup memory limit: %s", warn) + log.Warningf(ctx, "warning when reading cgroup memory limit: %s", log.SafeManaged(warn)) } - log.Infof(ctx, "writing go query profiles to %s", dir) + log.Infof(ctx, "writing go query profiles to %s", log.SafeManaged(dir)) qp := &ActiveQueryProfiler{ profiler: profiler{ store: newProfileStore(dumpStore, QueryFileNamePrefix, QueryFileNameSuffix, st), diff --git a/pkg/server/heapprofiler/activequeryprofiler_test.go b/pkg/server/heapprofiler/activequeryprofiler_test.go index 614b23edf57f..3165c3b45ffc 100644 --- a/pkg/server/heapprofiler/activequeryprofiler_test.go +++ b/pkg/server/heapprofiler/activequeryprofiler_test.go @@ -36,7 +36,7 @@ func TestNewActiveQueryProfiler(t *testing.T) { { name: "returns error when no access to cgroups", wantErr: true, - errMsg: "cgroups not available", + errMsg: "failed to detect cgroup memory limit: cgroups not available", storeDir: heapProfilerDirName, limitFn: cgroupFnWithReturn(0, "", errors.New("cgroups not available")), }, diff --git a/pkg/server/heapprofiler/heapprofiler.go b/pkg/server/heapprofiler/heapprofiler.go index dd539bbb76b1..c2b100424800 100644 --- a/pkg/server/heapprofiler/heapprofiler.go +++ b/pkg/server/heapprofiler/heapprofiler.go @@ -46,7 +46,10 @@ func NewHeapProfiler(ctx context.Context, dir string, st *cluster.Settings) (*He return nil, errors.AssertionFailedf("need to specify dir for NewHeapProfiler") } - log.Infof(ctx, "writing go heap profiles to %s at least every %s", dir, resetHighWaterMarkInterval) + log.Infof(ctx, + "writing go heap profiles to %s at least every %s", + log.SafeManaged(dir), + resetHighWaterMarkInterval) dumpStore := dumpstore.NewStore(dir, maxCombinedFileSize, st) diff --git a/pkg/server/heapprofiler/statsprofiler.go b/pkg/server/heapprofiler/statsprofiler.go index ab219c469d53..a94bd15f6606 100644 --- a/pkg/server/heapprofiler/statsprofiler.go +++ b/pkg/server/heapprofiler/statsprofiler.go @@ -51,7 +51,7 @@ func NewStatsProfiler( return nil, errors.AssertionFailedf("need to specify dir for NewStatsProfiler") } - log.Infof(ctx, "writing memory stats to %s at last every %s", dir, resetHighWaterMarkInterval) + log.Infof(ctx, "writing memory stats to %s at last every %s", log.SafeManaged(dir), resetHighWaterMarkInterval) dumpStore := dumpstore.NewStore(dir, maxCombinedFileSize, st) diff --git a/pkg/server/server.go b/pkg/server/server.go index 4a752c5f23d0..aae5b26433e0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1518,14 +1518,15 @@ func (s *Server) PreStart(ctx context.Context) error { logPendingLossOfQuorumRecoveryEvents(ctx, s.node.stores) log.Ops.Infof(ctx, "starting %s server at %s (use: %s)", - redact.Safe(s.cfg.HTTPRequestScheme()), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr) + redact.Safe(s.cfg.HTTPRequestScheme()), log.SafeManaged(s.cfg.HTTPAddr), log.SafeManaged(s.cfg.HTTPAdvertiseAddr)) rpcConnType := redact.SafeString("grpc/postgres") if s.cfg.SplitListenSQL { rpcConnType = "grpc" - log.Ops.Infof(ctx, "starting postgres server at %s (use: %s)", s.cfg.SQLAddr, s.cfg.SQLAdvertiseAddr) + log.Ops.Infof(ctx, "starting postgres server at %s (use: %s)", + log.SafeManaged(s.cfg.SQLAddr), log.SafeManaged(s.cfg.SQLAdvertiseAddr)) } - log.Ops.Infof(ctx, "starting %s server at %s", rpcConnType, s.cfg.Addr) - log.Ops.Infof(ctx, "advertising CockroachDB node at %s", s.cfg.AdvertiseAddr) + log.Ops.Infof(ctx, "starting %s server at %s", log.SafeManaged(rpcConnType), log.SafeManaged(s.cfg.Addr)) + log.Ops.Infof(ctx, "advertising CockroachDB node at %s", log.SafeManaged(s.cfg.AdvertiseAddr)) log.Event(ctx, "accepting connections") diff --git a/pkg/server/tracedumper/tracedumper.go b/pkg/server/tracedumper/tracedumper.go index 0f42fa572e5b..41bec1bf80d0 100644 --- a/pkg/server/tracedumper/tracedumper.go +++ b/pkg/server/tracedumper/tracedumper.go @@ -117,7 +117,7 @@ func NewTraceDumper(ctx context.Context, dir string, st *cluster.Settings) *Trac return nil } - log.Infof(ctx, "writing job trace dumps to %s", dir) + log.Infof(ctx, "writing job trace dumps to %s", log.SafeManaged(dir)) td := &TraceDumper{ currentTime: timeutil.Now, diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index e5fe9ed6d0a6..e4730b72e096 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -60,6 +60,12 @@ type Columnarizer struct { metadataAllocator *colmem.Allocator input execinfra.RowSource da tree.DatumAlloc + // getWrappedExecStats, if non-nil, is the function to get the execution + // statistics of the wrapped row-by-row processor. We store it separately + // from execinfra.ProcessorBaseNoHelper.ExecStatsForTrace so that the + // function is not called when the columnarizer is being drained (which is + // after the vectorized stats are processed). + getWrappedExecStats func() *execinfrapb.ComponentStats batch coldata.Batch vecs coldata.TypedVecs @@ -174,7 +180,7 @@ func (c *Columnarizer) Init(ctx context.Context) { return } c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1) - ctx = c.StartInternalNoSpan(ctx) + ctx = c.StartInternal(ctx, "columnarizer" /* name */) c.input.Start(ctx) if execStatsHijacker, ok := c.input.(execinfra.ExecStatsForTraceHijacker); ok { // The columnarizer is now responsible for propagating the execution @@ -188,7 +194,7 @@ func (c *Columnarizer) Init(ctx context.Context) { // Still, just to be safe, we delay the hijacking until Init so that in // case the assumption is wrong, we still get the stats from the wrapped // processor. - c.ExecStatsForTrace = execStatsHijacker.HijackExecStatsForTrace() + c.getWrappedExecStats = execStatsHijacker.HijackExecStatsForTrace() } } @@ -200,10 +206,10 @@ func (c *Columnarizer) GetStats() *execinfrapb.ComponentStats { )) } componentID := c.FlowCtx.ProcessorComponentID(c.ProcessorID) - if c.removedFromFlow || c.ExecStatsForTrace == nil { + if c.removedFromFlow || c.getWrappedExecStats == nil { return &execinfrapb.ComponentStats{Component: componentID} } - s := c.ExecStatsForTrace() + s := c.getWrappedExecStats() s.Component = componentID return s } diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index f94f8d65bfbf..ec41314a60b1 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -265,19 +265,7 @@ func (m *Materializer) OutputTypes() []*types.T { // Start is part of the execinfra.RowSource interface. func (m *Materializer) Start(ctx context.Context) { - if len(m.drainHelper.statsCollectors) > 0 { - // Since we're collecting stats, we'll derive a separate tracing span - // for them. If we don't do this, then the stats would be attached to - // the span of the materializer's user, and if that user itself has a - // lot of payloads to attach (e.g. a joinReader attaching the KV keys it - // looked up), then the stats might be dropped based on the maximum size - // of structured payload per tracing span of 10KiB (see - // tracing.maxStructuredBytesPerSpan). Deriving a separate span - // guarantees that the stats won't be dropped. - ctx = m.StartInternal(ctx, "materializer" /* name */) - } else { - ctx = m.StartInternalNoSpan(ctx) - } + ctx = m.StartInternal(ctx, "materializer" /* name */) // We can encounter an expected error during Init (e.g. an operator // attempts to allocate a batch, but the memory budget limit has been // reached), so we need to wrap it with a catcher. diff --git a/pkg/sql/colflow/colrpc/BUILD.bazel b/pkg/sql/colflow/colrpc/BUILD.bazel index c5ff77010bcb..95d810545561 100644 --- a/pkg/sql/colflow/colrpc/BUILD.bazel +++ b/pkg/sql/colflow/colrpc/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", + "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index d8dab29483d5..00dbb4eb9197 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" + "go.opentelemetry.io/otel/attribute" ) // flowStreamClient is a utility interface used to mock out the RPC layer. @@ -165,6 +166,8 @@ func (o *Outbox) Run( ctx, o.span = execinfra.ProcessorSpan(ctx, "outbox") if o.span != nil { defer o.span.Finish() + o.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(flowID.String())) + o.span.SetTag(execinfrapb.StreamIDTagKey, attribute.IntValue(int(streamID))) } o.runnerCtx = ctx diff --git a/pkg/sql/colflow/flow_coordinator.go b/pkg/sql/colflow/flow_coordinator.go index 601c32a3ecba..3dce8db4b33c 100644 --- a/pkg/sql/colflow/flow_coordinator.go +++ b/pkg/sql/colflow/flow_coordinator.go @@ -117,7 +117,7 @@ func (f *FlowCoordinator) OutputTypes() []*types.T { // Start is part of the execinfra.RowSource interface. func (f *FlowCoordinator) Start(ctx context.Context) { - ctx = f.StartInternalNoSpan(ctx) + ctx = f.StartInternal(ctx, "flow coordinator" /* name */) if err := colexecerror.CatchVectorizedRuntimeError(func() { f.input.Start(ctx) }); err != nil { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index c758b846a1e1..f037c439715f 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1499,10 +1499,6 @@ type ExecutorTestingKnobs struct { // to use a transaction, and, in doing so, more deterministically allocate // descriptor IDs at the cost of decreased parallelism. UseTransactionalDescIDGenerator bool - - // NoStatsCollectionWithVerboseTracing is used to disable the execution - // statistics collection in presence of the verbose tracing. - NoStatsCollectionWithVerboseTracing bool } // PGWireTestingKnobs contains knobs for the pgwire module. @@ -2698,35 +2694,14 @@ func getMessagesForSubtrace( return nil, errors.Errorf("duplicate span %d", span.SpanID) } var allLogs []logRecordRow - const spanStartMsgTemplate = "=== SPAN START: %s ===" - - // spanStartMsgs are metadata about the span, e.g. the operation name and tags - // contained in the span. They are added as one log message. - spanStartMsgs := make([]string, 0) - - spanStartMsgs = append(spanStartMsgs, fmt.Sprintf(spanStartMsgTemplate, span.Operation)) - - for _, tg := range span.TagGroups { - var prefix string - if tg.Name != tracingpb.AnonymousTagGroupName { - prefix = fmt.Sprintf("%s-", tg.Name) - } - for _, tag := range tg.Tags { - if !strings.HasPrefix(tag.Key, tracing.TagPrefix) { - // Not a tag to be output. - continue - } - spanStartMsgs = append(spanStartMsgs, fmt.Sprintf("%s%s: %s", prefix, tag.Key, tag.Value)) - } - } - // This message holds all the spanStartMsgs and marks the beginning of the - // span, to indicate the start time and duration of the span. + // This message marks the beginning of the span, to indicate the start time + // and duration of the span. allLogs = append( allLogs, logRecordRow{ timestamp: span.StartTime, - msg: strings.Join(spanStartMsgs, "\n"), + msg: fmt.Sprintf("=== SPAN START: %s ===", span.Operation), span: span, index: 0, }, diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 1002121f96da..088d8886090e 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -276,11 +276,6 @@ func (h *ProcOutputHelper) ProcessRow( return h.outputRow, h.rowIdx < h.maxRowIdx, nil } -// consumerClosed stops output of additional rows from ProcessRow. -func (h *ProcOutputHelper) consumerClosed() { - h.rowIdx = h.maxRowIdx -} - // Stats returns output statistics. func (h *ProcOutputHelper) Stats() execinfrapb.OutputStats { return execinfrapb.OutputStats{ @@ -843,30 +838,16 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. // // so that the caller doesn't mistakenly use old ctx object. func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context { - return pb.startImpl(ctx, true /* createSpan */, name) -} - -// StartInternalNoSpan does the same as StartInternal except that it does not -// start a span. This is used by pass-through components whose goal is to be a -// silent translation layer for components that actually do work (e.g. a -// planNodeToRowSource wrapping an insertNode, or a columnarizer wrapping a -// rowexec flow). -func (pb *ProcessorBaseNoHelper) StartInternalNoSpan(ctx context.Context) context.Context { - return pb.startImpl(ctx, false /* createSpan */, "") -} - -func (pb *ProcessorBaseNoHelper) startImpl( - ctx context.Context, createSpan bool, spanName string, -) context.Context { pb.origCtx = ctx - if createSpan { - pb.Ctx, pb.span = ProcessorSpan(ctx, spanName) + pb.Ctx = ctx + noSpan := pb.FlowCtx != nil && pb.FlowCtx.Cfg != nil && + pb.FlowCtx.Cfg.TestingKnobs.ProcessorNoTracingSpan + if !noSpan { + pb.Ctx, pb.span = ProcessorSpan(ctx, name) if pb.span != nil && pb.span.IsVerbose() { pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String())) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID))) } - } else { - pb.Ctx = ctx } pb.EvalCtx.Context = pb.Ctx return pb.Ctx @@ -882,31 +863,7 @@ func (pb *ProcessorBaseNoHelper) startImpl( // if pb.InternalClose() { // // Perform processor specific close work. // } -func (pb *ProcessorBase) InternalClose() bool { - return pb.InternalCloseEx(nil /* onClose */) -} - -// InternalCloseEx is like InternalClose, but also takes a closure to run in -// case the processor was not already closed. The closure is run before the -// processor's span is finished, so the closure can finalize work that relies on -// that span (e.g. async work previously started by the processor that has -// captured the processor's span). -func (pb *ProcessorBase) InternalCloseEx(onClose func()) bool { - closing := pb.ProcessorBaseNoHelper.InternalCloseEx(onClose) - if closing { - // This prevents Next() from returning more rows. - pb.OutputHelper.consumerClosed() - } - return closing -} - -// InternalClose is the meat of ProcessorBase.InternalClose. func (pb *ProcessorBaseNoHelper) InternalClose() bool { - return pb.InternalCloseEx(nil /* onClose */) -} - -// InternalCloseEx is the meat of ProcessorBase.InternalCloseEx. -func (pb *ProcessorBaseNoHelper) InternalCloseEx(onClose func()) bool { // Protection around double closing is useful for allowing ConsumerClosed() to // be called on processors that have already closed themselves by moving to // StateTrailingMeta. @@ -917,10 +874,6 @@ func (pb *ProcessorBaseNoHelper) InternalCloseEx(onClose func()) bool { input.ConsumerClosed() } - if onClose != nil { - onClose() - } - pb.Closed = true pb.span.Finish() pb.span = nil diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 082715c7b4f2..957fb11c3da1 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -288,6 +288,10 @@ type TestingKnobs struct { // IndexBackfillMergerTestingKnobs are the index backfill merger specific // testing knobs. IndexBackfillMergerTestingKnobs base.ModuleTestingKnobs + + // ProcessorNoTracingSpan is used to disable the creation of a tracing span + // in ProcessorBase.StartInternal if the tracing is enabled. + ProcessorNoTracingSpan bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/execinfrapb/BUILD.bazel b/pkg/sql/execinfrapb/BUILD.bazel index 5e41bd2d8de7..14cc780f5517 100644 --- a/pkg/sql/execinfrapb/BUILD.bazel +++ b/pkg/sql/execinfrapb/BUILD.bazel @@ -54,7 +54,6 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", - "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 9277127f562b..5d56c51a4f69 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/dustin/go-humanize" "github.com/gogo/protobuf/types" @@ -59,15 +58,15 @@ func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID { } } -// FlowIDTagKey is the key used for flow id tags in tracing spans. const ( - FlowIDTagKey = tracing.TagPrefix + "flowid" + // FlowIDTagKey is the key used for flow id tags in tracing spans. + FlowIDTagKey = "cockroach.flowid" // StreamIDTagKey is the key used for stream id tags in tracing spans. - StreamIDTagKey = tracing.TagPrefix + "streamid" + StreamIDTagKey = "cockroach.streamid" // ProcessorIDTagKey is the key used for processor id tags in tracing spans. - ProcessorIDTagKey = tracing.TagPrefix + "processorid" + ProcessorIDTagKey = "cockroach.processorid" ) // StatsForQueryPlan returns the statistics as a list of strings that can be diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 787dc7859ce2..8224ea387cbb 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -264,7 +264,7 @@ func (ih *instrumentationHelper) Setup( } if sp := tracing.SpanFromContext(ctx); sp != nil { - if sp.IsVerbose() && !cfg.TestingKnobs.NoStatsCollectionWithVerboseTracing { + if sp.IsVerbose() { // If verbose tracing was enabled at a higher level, stats // collection is enabled so that stats are shown in the traces, but // no extra work is needed by the instrumentationHelper. diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 41a81b511011..47e8c9c32252 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1207,9 +1207,9 @@ func (t *logicTest) newCluster( tempStorageDiskLimit := int64(512 << 20) /* 512 MiB */ // MVCC range tombstones are only available in 22.2 or newer. supportsMVCCRangeTombstones := (t.cfg.BootstrapVersion.Equal(roachpb.Version{}) || - !t.cfg.BootstrapVersion.Less(roachpb.Version{Major: 22, Minor: 2})) && + !t.cfg.BootstrapVersion.Less(clusterversion.ByKey(clusterversion.SetSystemUsersUserIDColumnNotNull))) && (t.cfg.BinaryVersion.Equal(roachpb.Version{}) || - !t.cfg.BinaryVersion.Less(roachpb.Version{Major: 22, Minor: 2})) + !t.cfg.BinaryVersion.Less(clusterversion.ByKey(clusterversion.SetSystemUsersUserIDColumnNotNull))) ignoreMVCCRangeTombstoneErrors := supportsMVCCRangeTombstones && (globalMVCCRangeTombstone || useMVCCRangeTombstonesForPointDeletes) @@ -1708,7 +1708,7 @@ CREATE DATABASE test; USE test; t.Fatal(err) } - if !t.cfg.BootstrapVersion.Equal(roachpb.Version{}) && t.cfg.BootstrapVersion.Less(roachpb.Version{Major: 22, Minor: 2}) { + if !t.cfg.BootstrapVersion.Equal(roachpb.Version{}) && t.cfg.BootstrapVersion.Less(clusterversion.ByKey(clusterversion.SetSystemUsersUserIDColumnNotNull)) { // Hacky way to create user with an ID if we're on a // bootstrapped binary less than 22.2. The version gate // causes the regular CREATE USER to fail since it will not diff --git a/pkg/sql/logictest/logictestbase/BUILD.bazel b/pkg/sql/logictest/logictestbase/BUILD.bazel index 8125bcc75765..4552df7c46c9 100644 --- a/pkg/sql/logictest/logictestbase/BUILD.bazel +++ b/pkg/sql/logictest/logictestbase/BUILD.bazel @@ -9,6 +9,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/build", + "//pkg/clusterversion", "//pkg/roachpb", "//pkg/util", ], diff --git a/pkg/sql/logictest/logictestbase/logictestbase.go b/pkg/sql/logictest/logictestbase/logictestbase.go index a4fb3743cf28..8d30585deff5 100644 --- a/pkg/sql/logictest/logictestbase/logictestbase.go +++ b/pkg/sql/logictest/logictestbase/logictestbase.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util" ) @@ -462,8 +463,8 @@ var LogicTestConfigs = []TestClusterConfig{ Name: "local-mixed-22.1-22.2", NumNodes: 1, OverrideDistSQLMode: "off", - BootstrapVersion: roachpb.Version{Major: 22, Minor: 1}, - BinaryVersion: roachpb.Version{Major: 22, Minor: 2}, + BootstrapVersion: clusterversion.ByKey(clusterversion.V22_1), + BinaryVersion: clusterversion.ByKey(clusterversion.PrioritizeSnapshots), //TODO: switch to 22.2. DisableUpgrade: true, DeclarativeCorpusCollection: true, }, diff --git a/pkg/sql/logictest/testdata/logic_test/new_schema_changer_mixed b/pkg/sql/logictest/testdata/logic_test/new_schema_changer_mixed index 7785d61e731a..1673e8f1d4d3 100644 --- a/pkg/sql/logictest/testdata/logic_test/new_schema_changer_mixed +++ b/pkg/sql/logictest/testdata/logic_test/new_schema_changer_mixed @@ -24,6 +24,9 @@ statement ok SET use_declarative_schema_changer = unsafe_always; # Verify that DDL stmts only supported in v22.2 will cause a panic. +statement error pq: \*tree\.AlterTable not implemented in the new schema changer +ALTER TABLE testdb.testsc.t ADD COLUMN j INT NOT NULL DEFAULT 30; + statement error pq: \*tree\.AlterTable not implemented in the new schema changer ALTER TABLE testdb.testsc.t DROP COLUMN j; @@ -61,9 +64,6 @@ statement error pq: \*tree\.DropIndex not implemented in the new schema changer DROP INDEX testdb.testsc.t@idx # Verify that DDL stmts supported in v22.1 will succeed. -statement ok -ALTER TABLE testdb.testsc.t ADD COLUMN j INT NOT NULL DEFAULT 30; - statement ok DROP TYPE testdb.testsc.typ; diff --git a/pkg/sql/opt/exec/execbuilder/testdata/delete b/pkg/sql/opt/exec/execbuilder/testdata/delete index 951b49f347f9..1ffc590ee3a4 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/delete +++ b/pkg/sql/opt/exec/execbuilder/testdata/delete @@ -230,10 +230,10 @@ query TT SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%DelRange%' OR message LIKE '%DelRng%' ---- -batch flow coordinator DelRange /Table/110/1 - /Table/110/2 -dist sender send r52: sending batch 1 DelRng to (n1,s1):1 -batch flow coordinator DelRange /Table/110/1/601/0 - /Table/110/2 -dist sender send r52: sending batch 1 DelRng to (n1,s1):1 +delete range DelRange /Table/110/1 - /Table/110/2 +dist sender send r52: sending batch 1 DelRng to (n1,s1):1 +delete range DelRange /Table/110/1/601/0 - /Table/110/2 +dist sender send r52: sending batch 1 DelRng to (n1,s1):1 # Ensure that DelRange requests are autocommitted when DELETE FROM happens on a # chunk of fewer than 600 keys. @@ -251,8 +251,8 @@ query TT SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%Del%' OR message LIKE '%sending batch%' ---- -batch flow coordinator Del /Table/110/1/5/0 -dist sender send r52: sending batch 1 Del, 1 EndTxn to (n1,s1):1 +delete range Del /Table/110/1/5/0 +dist sender send r52: sending batch 1 Del, 1 EndTxn to (n1,s1):1 # Ensure that we send DelRanges when doing a point delete operation on a table # that has multiple column families. @@ -270,8 +270,8 @@ query TT SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%Del%' OR message LIKE '%sending batch%' ---- -batch flow coordinator DelRange /Table/111/1/5 - /Table/111/1/6 -dist sender send r52: sending batch 1 DelRng to (n1,s1):1 +delete range DelRange /Table/111/1/5 - /Table/111/1/6 +dist sender send r52: sending batch 1 DelRng to (n1,s1):1 statement ok CREATE TABLE xyz ( diff --git a/pkg/sql/opt/exec/execbuilder/testdata/select b/pkg/sql/opt/exec/execbuilder/testdata/select index 3b31ee42be84..2c955ace5eb2 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/select +++ b/pkg/sql/opt/exec/execbuilder/testdata/select @@ -36,17 +36,19 @@ WHERE message LIKE '%SPAN START%' OR message LIKE '%pos%executing%'; 5 === SPAN START: optimizer === optimizer 6 === SPAN START: consuming rows === consuming rows 7 === SPAN START: flow === flow +8 === SPAN START: values === values 3 [Open pos:?] executing ExecStmt: COMMIT TRANSACTION sql txn -8 === SPAN START: sql query === sql query -9 === SPAN START: commit sql txn === commit sql txn +9 === SPAN START: sql query === sql query +10 === SPAN START: commit sql txn === commit sql txn 0 [NoTxn pos:?] executing ExecStmt: SELECT 2 session recording -10 === SPAN START: sql txn === sql txn -10 [Open pos:?] executing ExecStmt: SELECT 2 sql txn -11 === SPAN START: sql query === sql query -12 === SPAN START: optimizer === optimizer -13 === SPAN START: consuming rows === consuming rows -14 === SPAN START: flow === flow -15 === SPAN START: commit sql txn === commit sql txn +11 === SPAN START: sql txn === sql txn +11 [Open pos:?] executing ExecStmt: SELECT 2 sql txn +12 === SPAN START: sql query === sql query +13 === SPAN START: optimizer === optimizer +14 === SPAN START: consuming rows === consuming rows +15 === SPAN START: flow === flow +16 === SPAN START: values === values +17 === SPAN START: commit sql txn === commit sql txn 0 [NoTxn pos:?] executing Sync session recording 0 [NoTxn pos:?] executing ExecStmt: SET TRACING = off session recording diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace b/pkg/sql/opt/exec/execbuilder/testdata/show_trace index 4a75e41327af..1881a5aff68a 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace @@ -50,11 +50,11 @@ SET tracing = on,kv,results; CREATE DATABASE t; SET tracing = off query TT $trace_query ---- -batch flow coordinator CPut /NamespaceTable/30/1/106/0/"public"/4/1 -> 107 -batch flow coordinator CPut /Table/3/1/107/2/1 -> schema: version:1 parent_id:106 privileges: users: users: owner_proto:"admin" version:2 > > -batch flow coordinator CPut /NamespaceTable/30/1/0/0/"t"/4/1 -> 106 -batch flow coordinator CPut /Table/3/1/106/2/1 -> database: version:1 privileges: users: users: owner_proto:"root" version:2 > schemas: > state:PUBLIC offline_reason:"" default_privileges: > -sql query rows affected: 0 +create database CPut /NamespaceTable/30/1/106/0/"public"/4/1 -> 107 +create database CPut /Table/3/1/107/2/1 -> schema: version:1 parent_id:106 privileges: users: users: owner_proto:"admin" version:2 > > +create database CPut /NamespaceTable/30/1/0/0/"t"/4/1 -> 106 +create database CPut /Table/3/1/106/2/1 -> database: version:1 privileges: users: users: owner_proto:"root" version:2 > schemas: > state:PUBLIC offline_reason:"" default_privileges: > +sql query rows affected: 0 # More KV operations. @@ -64,9 +64,9 @@ SET tracing = on,kv,results; CREATE TABLE t.kv(k INT PRIMARY KEY, v INT, FAMILY query TT $trace_query ---- -batch flow coordinator CPut /NamespaceTable/30/1/106/107/"kv"/4/1 -> 108 -batch flow coordinator CPut /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:2 privileges: users: owner_proto:"root" version:2 > next_mutation_id:1 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:2 import_start_wall_time:0 > -sql query rows affected: 0 +create table CPut /NamespaceTable/30/1/106/107/"kv"/4/1 -> 108 +create table CPut /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:2 privileges: users: owner_proto:"root" version:2 > next_mutation_id:1 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:2 import_start_wall_time:0 > +sql query rows affected: 0 # We avoid using the full trace output, because that would make the # ensuing trace especially chatty, as it traces the index backfill at @@ -80,8 +80,8 @@ SET tracing = on,kv,results; CREATE UNIQUE INDEX woo ON t.kv(v); SET tracing = o query TT $trace_query ---- -batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:4 privileges: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 not_visible:false > state:BACKFILLING direction:ADD mutation_id:1 rollback:false > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:true created_at_nanos:... constraint_id:3 not_visible:false > state:DELETE_ONLY direction:ADD mutation_id:1 rollback:false > next_mutation_id:2 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:4 import_start_wall_time:0 > -sql query rows affected: 0 +create index Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:4 privileges: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 not_visible:false > state:BACKFILLING direction:ADD mutation_id:1 rollback:false > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:true created_at_nanos:... constraint_id:3 not_visible:false > state:DELETE_ONLY direction:ADD mutation_id:1 rollback:false > next_mutation_id:2 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:4 import_start_wall_time:0 > +sql query rows affected: 0 statement ok SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing = off @@ -89,10 +89,10 @@ SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing = query TT $trace_query ---- -batch flow coordinator CPut /Table/108/1/1/0 -> /TUPLE/2:2:Int/2 -batch flow coordinator InitPut /Table/108/2/2/0 -> /BYTES/0x89 -batch flow coordinator fast path completed -sql query rows affected: 1 +count CPut /Table/108/1/1/0 -> /TUPLE/2:2:Int/2 +count InitPut /Table/108/2/2/0 -> /BYTES/0x89 +count fast path completed +sql query rows affected: 1 statement error duplicate key value @@ -102,9 +102,9 @@ query TT set tracing=off; $trace_query ---- -batch flow coordinator CPut /Table/108/1/1/0 -> /TUPLE/2:2:Int/2 -batch flow coordinator InitPut /Table/108/2/2/0 -> /BYTES/0x89 -sql query execution failed after 0 rows: duplicate key value violates unique constraint "kv_pkey" +count CPut /Table/108/1/1/0 -> /TUPLE/2:2:Int/2 +count InitPut /Table/108/2/2/0 -> /BYTES/0x89 +sql query execution failed after 0 rows: duplicate key value violates unique constraint "kv_pkey" statement error duplicate key value SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (2,2); SET tracing = off @@ -113,9 +113,9 @@ query TT set tracing=off; $trace_query ---- -batch flow coordinator CPut /Table/108/1/2/0 -> /TUPLE/2:2:Int/2 -batch flow coordinator InitPut /Table/108/2/2/0 -> /BYTES/0x8a -sql query execution failed after 0 rows: duplicate key value violates unique constraint "woo" +count CPut /Table/108/1/2/0 -> /TUPLE/2:2:Int/2 +count InitPut /Table/108/2/2/0 -> /BYTES/0x8a +sql query execution failed after 0 rows: duplicate key value violates unique constraint "woo" statement ok SET tracing = on,kv,results; CREATE TABLE t.kv2 AS TABLE t.kv; @@ -126,9 +126,9 @@ SET tracing = off query TT $trace_query ---- -batch flow coordinator CPut /NamespaceTable/30/1/106/107/"kv2"/4/1 -> 109 -batch flow coordinator CPut /Table/3/1/109/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:false default_expr:"unique_rowid()" hidden:true inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:4 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:2 privileges: users: owner_proto:"root" version:2 > next_mutation_id:1 format_version:3 state:ADD offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"TABLE t.public.kv" create_as_of_time:<> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:2 import_start_wall_time:0 > -sql query rows affected: 0 +create table CPut /NamespaceTable/30/1/106/107/"kv2"/4/1 -> 109 +create table CPut /Table/3/1/109/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:false default_expr:"unique_rowid()" hidden:true inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:4 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:2 privileges: users: owner_proto:"root" version:2 > next_mutation_id:1 format_version:3 state:ADD offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"TABLE t.public.kv" create_as_of_time:<> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:2 import_start_wall_time:0 > +sql query rows affected: 0 statement ok SET tracing = on,kv,results; UPDATE t.kv2 SET v = v + 2; @@ -139,11 +139,11 @@ SET tracing = off query TT $trace_query ---- -colbatchscan Scan /Table/109/{1-2} -colbatchscan fetched: /kv2/kv2_pkey/-9222809086901354496/k/v -> /1/2 -batch flow coordinator Put /Table/109/1/-9222809086901354496/0 -> /TUPLE/1:1:Int/1/1:2:Int/4 -batch flow coordinator fast path completed -sql query rows affected: 1 +colbatchscan Scan /Table/109/{1-2} +colbatchscan fetched: /kv2/kv2_pkey/-9222809086901354496/k/v -> /1/2 +count Put /Table/109/1/-9222809086901354496/0 -> /TUPLE/1:1:Int/1/1:2:Int/4 +count fast path completed +sql query rows affected: 1 statement ok SET tracing = on,kv,results; DELETE FROM t.kv2; SET tracing = off @@ -151,9 +151,9 @@ SET tracing = on,kv,results; DELETE FROM t.kv2; SET tracing = off query TT $trace_query ---- -batch flow coordinator DelRange /Table/109/1 - /Table/109/2 -batch flow coordinator fast path completed -sql query rows affected: 1 +delete range DelRange /Table/109/1 - /Table/109/2 +delete range fast path completed +sql query rows affected: 1 statement ok SET tracing = on,kv,results; DROP TABLE t.kv2 @@ -177,12 +177,12 @@ SET tracing = off query TT $trace_query ---- -colbatchscan Scan /Table/108/{1-2} -colbatchscan fetched: /kv/kv_pkey/1/v -> /2 -batch flow coordinator Del /Table/108/2/2/0 -batch flow coordinator Del /Table/108/1/1/0 -batch flow coordinator fast path completed -sql query rows affected: 1 +colbatchscan Scan /Table/108/{1-2} +colbatchscan fetched: /kv/kv_pkey/1/v -> /2 +count Del /Table/108/2/2/0 +count Del /Table/108/1/1/0 +count fast path completed +sql query rows affected: 1 statement ok SET tracing = on,kv,results; DROP INDEX t.kv@woo CASCADE @@ -193,8 +193,8 @@ SET tracing = off query TT $trace_query ---- -batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:4 privileges: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 not_visible:false > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:4 import_start_wall_time:0 > -sql query rows affected: 0 +drop index Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 not_visible:false > next_index_id:4 privileges: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 not_visible:false > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false refresh_view_required:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:4 import_start_wall_time:0 > +sql query rows affected: 0 statement ok SET tracing = on,kv,results; DROP TABLE t.kv diff --git a/pkg/sql/opt/exec/execbuilder/testdata/upsert b/pkg/sql/opt/exec/execbuilder/testdata/upsert index 59d2837c5082..855645857de4 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/upsert +++ b/pkg/sql/opt/exec/execbuilder/testdata/upsert @@ -699,11 +699,11 @@ query TT SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch' ---- -colbatchscan Scan /Table/120/1/2/0 -batch flow coordinator CPut /Table/120/1/2/0 -> /TUPLE/2:2:Int/3 -batch flow coordinator InitPut /Table/120/2/3/0 -> /BYTES/0x8a -batch flow coordinator fast path completed -sql query rows affected: 1 +colbatchscan Scan /Table/120/1/2/0 +count CPut /Table/120/1/2/0 -> /TUPLE/2:2:Int/3 +count InitPut /Table/120/2/3/0 -> /BYTES/0x8a +count fast path completed +sql query rows affected: 1 statement ok SET tracing = on,kv,results; UPSERT INTO t.kv(k, v) VALUES (1,2); SET tracing = off @@ -712,11 +712,11 @@ query TT SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch' ---- -colbatchscan Scan /Table/120/1/1/0 -batch flow coordinator CPut /Table/120/1/1/0 -> /TUPLE/2:2:Int/2 -batch flow coordinator InitPut /Table/120/2/2/0 -> /BYTES/0x89 -batch flow coordinator fast path completed -sql query rows affected: 1 +colbatchscan Scan /Table/120/1/1/0 +count CPut /Table/120/1/1/0 -> /TUPLE/2:2:Int/2 +count InitPut /Table/120/2/2/0 -> /BYTES/0x89 +count fast path completed +sql query rows affected: 1 statement error duplicate key value SET tracing = on,kv,results; UPSERT INTO t.kv(k, v) VALUES (2,2); SET tracing = off @@ -726,9 +726,9 @@ set tracing=off; SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch' ---- -colbatchscan Scan /Table/120/1/2/0 -colbatchscan fetched: /kv/kv_pkey/2/v -> /3 -batch flow coordinator Put /Table/120/1/2/0 -> /TUPLE/2:2:Int/2 -batch flow coordinator Del /Table/120/2/3/0 -batch flow coordinator CPut /Table/120/2/2/0 -> /BYTES/0x8a (expecting does not exist) -sql query execution failed after 0 rows: duplicate key value violates unique constraint "woo" +colbatchscan Scan /Table/120/1/2/0 +colbatchscan fetched: /kv/kv_pkey/2/v -> /3 +count Put /Table/120/1/2/0 -> /TUPLE/2:2:Int/2 +count Del /Table/120/2/3/0 +count CPut /Table/120/2/2/0 -> /BYTES/0x8a (expecting does not exist) +sql query execution failed after 0 rows: duplicate key value violates unique constraint "woo" diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index 5f1d7085071a..aff19ac2af42 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -141,7 +141,7 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS } func (p *planNodeToRowSource) Start(ctx context.Context) { - ctx = p.StartInternalNoSpan(ctx) + ctx = p.StartInternal(ctx, nodeName(p.node)) p.params.ctx = ctx // This starts all of the nodes below this node. if err := startExec(p.params, p.node); err != nil { diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 9aacf6891448..fc254d795340 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -311,13 +311,18 @@ func (ps *projectSetProcessor) toEncDatum(d tree.Datum, colIdx int) rowenc.EncDa } func (ps *projectSetProcessor) close() { - ps.InternalCloseEx(func() { - for _, gen := range ps.gens { - if gen != nil { - gen.Close(ps.Ctx) - } + if ps.Closed { + return + } + // Close all generator functions before the context is replaced in + // InternalClose(). + for i, gen := range ps.gens { + if gen != nil { + gen.Close(ps.Ctx) + ps.gens[i] = nil } - }) + } + ps.InternalClose() } // ConsumerClosed is part of the RowSource interface. diff --git a/pkg/sql/schemachanger/scbuild/build.go b/pkg/sql/schemachanger/scbuild/build.go index 6fbf279b6dd6..2c76fc3c9da6 100644 --- a/pkg/sql/schemachanger/scbuild/build.go +++ b/pkg/sql/schemachanger/scbuild/build.go @@ -89,12 +89,18 @@ func Build( Authorization: els.authorization, } current := make([]scpb.Status, 0, len(bs.output)) + version := dependencies.ClusterSettings().Version.ActiveVersion(ctx) for _, e := range bs.output { if e.metadata.Size() == 0 { // Exclude targets which weren't explicitly set. // Explicitly-set targets have non-zero values in the target metadata. continue } + // Exclude targets which are not yet usable in the currently active + // cluster version. + if !version.IsActive(screl.MinVersion(e.element)) { + continue + } ts.Targets = append(ts.Targets, scpb.MakeTarget(e.target, e.element, &e.metadata)) current = append(current, e.current) } diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table.go index 61abef4987b0..fb6ead021d4e 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table.go @@ -47,7 +47,7 @@ type supportedAlterTableCommand struct { // declarative schema changer. Operations marked as non-fully supported can // only be with the use_declarative_schema_changer session variable. var supportedAlterTableStatements = map[reflect.Type]supportedAlterTableCommand{ - reflect.TypeOf((*tree.AlterTableAddColumn)(nil)): {fn: alterTableAddColumn, on: true, minSupportedClusterVersion: clusterversion.V22_1}, + reflect.TypeOf((*tree.AlterTableAddColumn)(nil)): {fn: alterTableAddColumn, on: true, minSupportedClusterVersion: clusterversion.Start22_2}, reflect.TypeOf((*tree.AlterTableDropColumn)(nil)): {fn: alterTableDropColumn, on: true, minSupportedClusterVersion: clusterversion.Start22_2}, reflect.TypeOf((*tree.AlterTableAlterPrimaryKey)(nil)): {fn: alterTableAlterPrimaryKey, on: true, minSupportedClusterVersion: clusterversion.Start22_2}, reflect.TypeOf((*tree.AlterTableAddConstraint)(nil)): {fn: alterTableAddConstraint, on: true, extraChecks: func( diff --git a/pkg/sql/schemachanger/screl/BUILD.bazel b/pkg/sql/schemachanger/screl/BUILD.bazel index 56f031f84c65..0e2b08ddd320 100644 --- a/pkg/sql/schemachanger/screl/BUILD.bazel +++ b/pkg/sql/schemachanger/screl/BUILD.bazel @@ -17,6 +17,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/screl", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/descpb", diff --git a/pkg/sql/schemachanger/screl/scalars.go b/pkg/sql/schemachanger/screl/scalars.go index 673c1c46b5e7..b8d8d24b581f 100644 --- a/pkg/sql/schemachanger/screl/scalars.go +++ b/pkg/sql/schemachanger/screl/scalars.go @@ -11,6 +11,7 @@ package screl import ( + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" @@ -91,3 +92,29 @@ func ContainsDescID(haystack scpb.Element, needle catid.DescID) (contains bool) }) return contains } + +// MinVersion returns the minimum cluster version at which an element may +// be used. +func MinVersion(el scpb.Element) clusterversion.Key { + switch el.(type) { + case *scpb.Database, *scpb.Schema, *scpb.View, *scpb.Sequence, *scpb.Table, + *scpb.AliasType, *scpb.ColumnFamily, *scpb.Column, *scpb.PrimaryIndex, + *scpb.SecondaryIndex, *scpb.TemporaryIndex, *scpb.EnumType, + *scpb.UniqueWithoutIndexConstraint, *scpb.CheckConstraint, + *scpb.ForeignKeyConstraint, *scpb.TableComment, *scpb.RowLevelTTL, + *scpb.TableLocalityGlobal, *scpb.TableLocalityPrimaryRegion, + *scpb.TableLocalitySecondaryRegion, *scpb.TableLocalityRegionalByRow, + *scpb.ColumnName, *scpb.ColumnType, *scpb.ColumnDefaultExpression, + *scpb.ColumnOnUpdateExpression, *scpb.SequenceOwner, *scpb.ColumnComment, + *scpb.IndexName, *scpb.IndexPartitioning, *scpb.SecondaryIndexPartial, + *scpb.IndexComment, *scpb.ConstraintName, *scpb.ConstraintComment, + *scpb.Namespace, *scpb.Owner, *scpb.UserPrivileges, + *scpb.DatabaseRegionConfig, *scpb.DatabaseRoleSetting, *scpb.DatabaseComment, + *scpb.SchemaParent, *scpb.SchemaComment, *scpb.ObjectParent: + return clusterversion.V22_1 + case *scpb.IndexColumn, *scpb.EnumTypeValue, *scpb.TableZoneConfig: + return clusterversion.UseDelRangeInGCJob + default: + panic(errors.AssertionFailedf("unknown element %T", el)) + } +} diff --git a/pkg/sql/schemachanger/screl/scalars_test.go b/pkg/sql/schemachanger/screl/scalars_test.go index 1686692a89bb..1c19326114b3 100644 --- a/pkg/sql/schemachanger/screl/scalars_test.go +++ b/pkg/sql/schemachanger/screl/scalars_test.go @@ -24,11 +24,24 @@ import ( // TestAllElementsHaveDescID ensures that all element types have a DescID. func TestAllElementsHaveDescID(t *testing.T) { + forEachElementType(func(elem scpb.Element) { + require.Equalf(t, descpb.ID(0), GetDescID(elem), "elem %T", elem) + }) +} + +func TestAllElementsHaveMinVersion(t *testing.T) { + forEachElementType(func(elem scpb.Element) { + // If `elem` does not have a min version, the following function call will panic. + MinVersion(elem) + }) +} + +func forEachElementType(f func(element scpb.Element)) { typ := reflect.TypeOf((*scpb.ElementProto)(nil)).Elem() for i := 0; i < typ.NumField(); i++ { - f := typ.Field(i) - elem := reflect.New(f.Type.Elem()).Interface().(scpb.Element) - require.Equal(t, descpb.ID(0), GetDescID(elem)) + field := typ.Field(i) + elem := reflect.New(field.Type.Elem()).Interface().(scpb.Element) + f(elem) } } diff --git a/pkg/sql/tests/autocommit_extended_protocol_test.go b/pkg/sql/tests/autocommit_extended_protocol_test.go index 2c21800d9e7a..1b0f17660bbc 100644 --- a/pkg/sql/tests/autocommit_extended_protocol_test.go +++ b/pkg/sql/tests/autocommit_extended_protocol_test.go @@ -58,7 +58,7 @@ func TestInsertFastPathExtendedProtocol(t *testing.T) { var msg, operation string err = rows.Scan(&msg, &operation) require.NoError(t, err) - if msg == "autocommit enabled" && operation == "batch flow coordinator" { + if msg == "autocommit enabled" && operation == "count" { fastPathEnabled = true } } diff --git a/pkg/upgrade/upgrademanager/BUILD.bazel b/pkg/upgrade/upgrademanager/BUILD.bazel index babb9a750cef..94da57089cd1 100644 --- a/pkg/upgrade/upgrademanager/BUILD.bazel +++ b/pkg/upgrade/upgrademanager/BUILD.bazel @@ -50,7 +50,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", - "//pkg/sql", + "//pkg/sql/execinfra", "//pkg/sql/sqlutil", "//pkg/testutils", "//pkg/testutils/serverutils", diff --git a/pkg/upgrade/upgrademanager/manager_external_test.go b/pkg/upgrade/upgrademanager/manager_external_test.go index d35775541221..2a62c545c4be 100644 --- a/pkg/upgrade/upgrademanager/manager_external_test.go +++ b/pkg/upgrade/upgrademanager/manager_external_test.go @@ -28,7 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -72,9 +72,9 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { BinaryVersionOverride: startCV.Version, DisableAutomaticVersionUpgrade: make(chan struct{}), }, - SQLExecutor: &sql.ExecutorTestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ // See the TODO below for why we need this. - NoStatsCollectionWithVerboseTracing: true, + ProcessorNoTracingSpan: true, }, UpgradeManager: &upgrade.TestingKnobs{ ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion { @@ -188,13 +188,8 @@ RETURNING id;`).Scan(&secondID)) // 'unblock' channel, and this we cannot do until we see the expected // message in the trace. // - // At the moment it works in a very fragile manner (by making sure that - // no processors actually create their own spans). In particular, we - // make sure that the execution statistics are not being collected for - // the statement by: - // - disabling the stats collection in the presence of the verbose - // tracing - // - disabling the sampling altogether. + // At the moment it works in a very fragile manner by making sure that + // no processors actually create their own spans. // // Instead, a different way to observe the status of the upgrade manager // should be introduced and should be used here. diff --git a/pkg/upgrade/upgrades/builtins_test.go b/pkg/upgrade/upgrades/builtins_test.go index a3bfa3cff216..405749fb2243 100644 --- a/pkg/upgrade/upgrades/builtins_test.go +++ b/pkg/upgrade/upgrades/builtins_test.go @@ -46,13 +46,14 @@ func TestIsAtLeastVersionBuiltin(t *testing.T) { ) defer tc.Stopper().Stop(ctx) + v := clusterversion.ByKey(clusterversion.Start22_2).String() // Check that the builtin returns false when comparing against 22.1-2 // version because we are still on 22.1-0. - sqlDB.CheckQueryResults(t, "SELECT crdb_internal.is_at_least_version('22.1-2')", [][]string{{"false"}}) + sqlDB.CheckQueryResults(t, "SELECT crdb_internal.is_at_least_version('"+v+"')", [][]string{{"false"}}) // Run the upgrade. - sqlDB.Exec(t, "SET CLUSTER SETTING version = $1", clusterversion.ByKey(clusterversion.Start22_2).String()) + sqlDB.Exec(t, "SET CLUSTER SETTING version = $1", v) // It should now return true. - sqlDB.CheckQueryResultsRetry(t, "SELECT crdb_internal.is_at_least_version('22.1-2')", [][]string{{"true"}}) + sqlDB.CheckQueryResultsRetry(t, "SELECT crdb_internal.is_at_least_version('"+v+"')", [][]string{{"true"}}) } diff --git a/pkg/upgrade/upgrades/role_id_migration_test.go b/pkg/upgrade/upgrades/role_id_migration_test.go index c6f2e81fa857..c747fce47a9b 100644 --- a/pkg/upgrade/upgrades/role_id_migration_test.go +++ b/pkg/upgrade/upgrades/role_id_migration_test.go @@ -205,6 +205,7 @@ func TestRoleIDMigration100User(t *testing.T) { func TestRoleIDMigration15000Users(t *testing.T) { skip.UnderStress(t) + skip.UnderRace(t) // 15000 is 1.5x the batch size used in the migration. runTestRoleIDMigration(t, 15000) } diff --git a/pkg/upgrade/upgrades/role_options_migration_test.go b/pkg/upgrade/upgrades/role_options_migration_test.go index 2c05d62d51b8..458c0613f1b0 100644 --- a/pkg/upgrade/upgrades/role_options_migration_test.go +++ b/pkg/upgrade/upgrades/role_options_migration_test.go @@ -214,6 +214,7 @@ func TestRoleOptionsMigration100User(t *testing.T) { func TestRoleOptionsMigration15000User(t *testing.T) { skip.UnderStress(t) + skip.UnderRace(t) runTestRoleOptionsMigration(t, 15000) } diff --git a/pkg/util/cgroups/BUILD.bazel b/pkg/util/cgroups/BUILD.bazel index 2603da4c8614..843fe2b64e5b 100644 --- a/pkg/util/cgroups/BUILD.bazel +++ b/pkg/util/cgroups/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/util/log", "//pkg/util/system", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/util/cgroups/cgroups.go b/pkg/util/cgroups/cgroups.go index cceb9d01a304..f7057224bcc7 100644 --- a/pkg/util/cgroups/cgroups.go +++ b/pkg/util/cgroups/cgroups.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/system" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) const ( @@ -433,7 +434,10 @@ func readInt64Value( func detectCntrlPath(cgroupFilePath string, controller string) (string, error) { cgroup, err := os.Open(cgroupFilePath) if err != nil { - return "", errors.Wrapf(err, "failed to read %s cgroup from cgroups file: %s", controller, cgroupFilePath) + return "", errors.Wrapf(err, + "failed to read %s cgroup from cgroups file: %s", + redact.Safe(controller), + log.SafeManaged(cgroupFilePath)) } defer func() { _ = cgroup.Close() }() @@ -465,7 +469,7 @@ func detectCntrlPath(cgroupFilePath string, controller string) (string, error) { func getCgroupDetails(mountinfoPath string, cRoot string, controller string) (string, int, error) { info, err := os.Open(mountinfoPath) if err != nil { - return "", 0, errors.Wrapf(err, "failed to read mounts info from file: %s", mountinfoPath) + return "", 0, errors.Wrapf(err, "failed to read mounts info from file: %s", log.SafeManaged(mountinfoPath)) } defer func() { _ = info.Close() diff --git a/pkg/util/envutil/env.go b/pkg/util/envutil/env.go index ebaa553dcc58..7a5285d39423 100644 --- a/pkg/util/envutil/env.go +++ b/pkg/util/envutil/env.go @@ -190,6 +190,9 @@ var safeVarRegistry = map[redact.SafeString]struct{}{ "GODEBUG": {}, "GOMAXPROCS": {}, "GOTRACEBACK": {}, + // gRPC. + "GRPC_GO_LOG_SEVERITY_LEVEL": {}, + "GRPC_GO_LOG_VERBOSITY_LEVEL": {}, } // valueReportableUnsafeVarRegistry is the list of variables where we can @@ -198,21 +201,19 @@ var safeVarRegistry = map[redact.SafeString]struct{}{ // that users would be unhappy to see them enclosed within redaction // markers in log files. var valueReportableUnsafeVarRegistry = map[redact.SafeString]struct{}{ - "DEBUG_HTTP2_GOROUTINES": {}, - "GRPC_GO_LOG_SEVERITY_LEVEL": {}, - "GRPC_GO_LOG_VERBOSITY_LEVEL": {}, - "HOST_IP": {}, - "LANG": {}, - "LC_ALL": {}, - "LC_COLLATE": {}, - "LC_CTYPE": {}, - "LC_TIME": {}, - "LC_NUMERIC": {}, - "LC_MESSAGES": {}, - "LS_METRICS_ENABLED": {}, - "TERM": {}, - "TZ": {}, - "ZONEINFO": {}, + "DEBUG_HTTP2_GOROUTINES": {}, + "HOST_IP": {}, + "LANG": {}, + "LC_ALL": {}, + "LC_COLLATE": {}, + "LC_CTYPE": {}, + "LC_TIME": {}, + "LC_NUMERIC": {}, + "LC_MESSAGES": {}, + "LS_METRICS_ENABLED": {}, + "TERM": {}, + "TZ": {}, + "ZONEINFO": {}, // From the Go runtime. "LOCALDOMAIN": {}, "RES_OPTIONS": {}, @@ -258,9 +259,6 @@ var nameReportableUnsafeVarRegistry = map[redact.SafeString]struct{}{ "GAE_MODULE_NAME": {}, "GAE_PARTITION": {}, "GAE_SERVICE": {}, - // gRPC. - "GRPC_GO_LOG_SEVERITY_LEVEL": {}, - "GRPC_GO_LOG_VERBOSITY_LEVEL": {}, // Kerberos. "KRB5CCNAME": {}, // Pprof. diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index e2d55d6b5a0e..93fcb4d1156a 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -167,6 +167,7 @@ go_test( "//pkg/settings/cluster", "//pkg/util/caller", "//pkg/util/ctxgroup", + "//pkg/util/envutil", "//pkg/util/leaktest", "//pkg/util/log/channel", "//pkg/util/log/logconfig", diff --git a/pkg/util/log/clog.go b/pkg/util/log/clog.go index 42f2f381b461..3da236e43124 100644 --- a/pkg/util/log/clog.go +++ b/pkg/util/log/clog.go @@ -99,6 +99,11 @@ type loggingT struct { // to this logger already. active bool firstUseStack string + + // redactionPolicyManaged indicates whether we're running as part of a managed + // service (sourced from COCKROACH_REDACTION_POLICY_MANAGED env var). Impacts + // log redaction policies for log args marked with SafeManaged. + redactionPolicyManaged bool } allSinkInfos sinkInfoRegistry @@ -217,6 +222,24 @@ func (l *loggingT) signalFatalCh() { } } +// setManagedRedactionPolicy configures the logging setup to indicate if +// we are running as part of a managed service. see SafeManaged for details +// on how this impacts log redaction policies. +func (l *loggingT) setManagedRedactionPolicy(isManaged bool) { + l.mu.Lock() + defer l.mu.Unlock() + l.mu.redactionPolicyManaged = isManaged +} + +// hasManagedRedactionPolicy indicates if the logging setup is being run +// as part of a managed service. see SafeManaged for details on how this +// impacts log redaction policies. +func (l *loggingT) hasManagedRedactionPolicy() bool { + l.mu.Lock() + defer l.mu.Unlock() + return l.mu.redactionPolicyManaged +} + // outputLogEntry marshals a log entry proto into bytes, and writes // the data to the log files. If a trace location is set, stack traces // are added to the entry before marshaling. diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 107d34c4119e..5fa90ca7ba5a 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -16,6 +16,7 @@ import ( "io/fs" "math" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/log/channel" "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/cockroach/pkg/util/log/logflags" @@ -42,6 +43,15 @@ type config struct { var debugLog *loggerT +// redactionPolicyManaged is the env var used to indicate that the node is being +// run as part of a managed service (e.g. CockroachCloud). Certain logged information +// such as filepaths, network addresses, and CLI argument lists are considered +// sensitive information in on-premises deployments. However, when the node is being +// run as part of a managed service (e.g. CockroachCloud), this type of information is +// no longer considered sensitive, and should be logged in an unredacted form to aid +// in support escalations. +const redactionPolicyManagedEnvVar = "COCKROACH_REDACTION_POLICY_MANAGED" + func init() { logflags.InitFlags( &logging.showLogs, @@ -132,6 +142,9 @@ func ApplyConfig(config logconfig.Config) (logShutdownFn func(), err error) { logging.allLoggers.clear() logging.allSinkInfos.clear() + // Indicate whether we're running in a managed environment. Impacts redaction policies. + logging.setManagedRedactionPolicy(envutil.EnvOrDefaultBool(redactionPolicyManagedEnvVar, false)) + // If capture of internal fd2 writes is enabled, set it up here. if config.CaptureFd2.Enable { if logging.testingFd2CaptureLogger != nil { diff --git a/pkg/util/log/log_entry.go b/pkg/util/log/log_entry.go index 61c767efbeb9..6cc9c1173880 100644 --- a/pkg/util/log/log_entry.go +++ b/pkg/util/log/log_entry.go @@ -276,9 +276,9 @@ func (l *sinkInfo) getStartLines(now time.Time) []*buffer { messages := make([]*buffer, 0, 6) messages = append(messages, makeStartLine(f, "file created at: %s", redact.Safe(now.Format("2006/01/02 15:04:05"))), - makeStartLine(f, "running on machine: %s", fullHostName), + makeStartLine(f, "running on machine: %s", SafeManaged(fullHostName)), makeStartLine(f, "binary: %s", redact.Safe(build.GetInfo().Short())), - makeStartLine(f, "arguments: %s", os.Args), + makeStartLine(f, "arguments: %s", SafeManaged(os.Args)), ) // Including a non-ascii character in the first 1024 bytes of the log helps diff --git a/pkg/util/log/redact.go b/pkg/util/log/redact.go index 1c031b1a33b0..96875e1b29a7 100644 --- a/pkg/util/log/redact.go +++ b/pkg/util/log/redact.go @@ -210,3 +210,27 @@ func TestingSetRedactable(redactableLogs bool) (cleanup func()) { func SafeOperational(s interface{}) redact.SafeValue { return redact.Safe(s) } + +// SafeManaged marks the provided argument as safe from a redaction +// perspective in cases where the node is being run as part of a managed +// service. This is indicated via the `COCKROACH_REDACTION_POLICY_MANAGED` +// environment variable. +// +// Certain types of data is normally considered "sensitive" from a +// redaction perspective when logged from on-premises deployments, such +// as CLI arguments and HTTP addresses. However, when running in a +// managed service, such as CockroachCloud, this information is already +// known to the operators and does not need to be treated as sensitive. +// +// NB: If the argument itself implements the redact.SafeFormatter interface, +// then we delegate to its implementation in either case. +// +// NB: This approach is lightweight, but is not sustainable to build on top of. +// We should be looking for more holistic approaches to conditional redaction. +// See https://github.com/cockroachdb/cockroach/issues/87038 for details. +func SafeManaged(a interface{}) interface{} { + if !logging.hasManagedRedactionPolicy() { + return a + } + return redact.Safe(a) +} diff --git a/pkg/util/log/redact_test.go b/pkg/util/log/redact_test.go index c202b9881cf1..7ab8194d55cf 100644 --- a/pkg/util/log/redact_test.go +++ b/pkg/util/log/redact_test.go @@ -18,10 +18,14 @@ import ( "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const startRedactable = "‹" @@ -90,6 +94,54 @@ func TestRedactedLogOutput(t *testing.T) { } } +func TestSafeManaged(t *testing.T) { + defer leaktest.AfterTest(t)() + s := ScopeWithoutShowLogs(t) + defer s.Close(t) + tests := []struct { + name string + arg interface{} + expected redact.RedactableString + redactionPolicyManagedEnabled bool + }{ + { + name: "redacts when not in redaction policy managed mode", + arg: "some value", + expected: redact.Sprint("some value"), + redactionPolicyManagedEnabled: false, + }, + { + name: "marks safe when in redaction policy managed mode", + arg: "some value", + expected: redact.Sprint(redact.Safe("some value")), + redactionPolicyManagedEnabled: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Cleanup(func() { + envutil.ClearEnvCache() + }) + + t.Setenv(redactionPolicyManagedEnvVar, fmt.Sprint(tc.redactionPolicyManagedEnabled)) + + TestingResetActive() + cfg := logconfig.DefaultConfig() + if err := cfg.Validate(&s.logDir); err != nil { + t.Fatal(err) + } + cleanupFn, err := ApplyConfig(cfg) + if err != nil { + t.Fatal(err) + } + defer cleanupFn() + + require.Equal(t, logging.hasManagedRedactionPolicy(), tc.redactionPolicyManagedEnabled) + require.Equal(t, tc.expected, redact.Sprint(SafeManaged(tc.arg))) + }) + } +} + func TestRedactedDecodeFile(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index 5dddd76aea64..be42f7088ed3 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -27,11 +27,6 @@ import ( "golang.org/x/net/trace" ) -const ( - // TagPrefix is prefixed to all tags that should be output in SHOW TRACE. - TagPrefix = "cockroach." -) - // Span is the tracing Span that we use in CockroachDB. Depending on the tracing // configuration, it can hold anywhere between zero and three destinations for // trace information: