Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
71037: opt: rename ScalarID to ScalarRank r=mgartner a=mgartner

Previously, every scalar expression (except lists and list items) had an
ID that was said to be unique within the context of a memo. These IDs
were originally added as a way to canonically order filters. Being named
"IDs", their use later expanded to check for equality of two scalar
expressions.

Maintaining this uniqueness invariant is difficult in practice and has
dangerous implications when it is violated, as seen in #71002. While two
different scalar expressions with the same ID could certainly cause
problems for sorting filters, using these IDs to check for scalar
expression equality can be catastrophic. For example, a filter
expression that shares an ID with another expression could be completely
removed from the filter.

Unfortunately, there's no obvious way to add test build assertions that
scalar IDs are in fact unique, as explained in #71035. In order to
lessen the blast radius of breaking this invariant, this commit renames
"scalar ID" to "scalar rank". The comment for this attribute does not
explicitly guarantee its uniqueness. This renaming should urge
contributors to only use this value for ordering scalar expressions
canonically, not for scalar expression equality. Instead, pointer
equality should be used to check if two scalar expressions are the same.

Release note: None


71056: util/tracing: make some span options in singletons  r=andreimatei a=andreimatei

A couple of span creation options are empty structs implementing the
SpanOption interface. Being an empty struct, putting it in an interface
doesn't allocate as the compiler optimizes small types in interfaces.
Still, the output of `gcflags=-m` lists the value as escaping to the
heap, very confusingly. This patch introduces singletons for the structs
to make it clear that there's no allocation.

Release note: None

71108: server,*: untangle the Tracer from the Settings r=andreimatei a=andreimatei

See individual commits.

Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
3 people committed Oct 7, 2021
4 parents 5b1bd56 + eae5076 + d38e7e2 + 2309d77 commit 9208567
Show file tree
Hide file tree
Showing 62 changed files with 322 additions and 180 deletions.
1 change: 1 addition & 0 deletions pkg/base/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/util/netutil/addr",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
Expand Down
3 changes: 3 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// TestServerArgs contains the parameters one can set when creating a test
Expand Down Expand Up @@ -130,6 +131,7 @@ type TestServerArgs struct {
// IF set, the demo login endpoint will be enabled.
EnableDemoLoginEndpoint bool

Tracer *tracing.Tracer
// If set, a TraceDir is initialized at the provided path.
TraceDir string

Expand Down Expand Up @@ -240,6 +242,7 @@ type TestTenantArgs struct {
// Settings allows the caller to control the settings object used for the
// tenant cluster.
Settings *cluster.Settings
Tracer *tracing.Tracer

// AllowSettingClusterSettings, if true, allows the tenant to set in-memory
// cluster settings.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *testServerShim) RPCContext() *rpc.Context { panic(unsuppor
func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Tracer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func runStart(cmd *cobra.Command, args []string, startSingleNode bool) (returnEr
// This span concludes when the startup goroutine started below
// has completed.
// TODO(andrei): we don't close the span on the early returns below.
tracer := serverCfg.Settings.Tracer
tracer := serverCfg.Tracer
startupSpan := tracer.StartSpan("server start")
ctx = tracing.ContextWithSpan(ctx, startupSpan)

Expand Down Expand Up @@ -1183,7 +1183,7 @@ func getClientGRPCConn(
stopper := stop.NewStopper()
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: cfg.Settings.Tracer},
AmbientCtx: log.AmbientContext{Tracer: cfg.Tracer},
Config: cfg.Config,
Clock: clock,
Stopper: stopper,
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (r *Registry) runJob(
// TODO(ajwerner): Move this writing up the trace ID down into
// stepThroughStateMachine where we're already often (and soon with
// exponential backoff, always) updating the job in that call.
ctx, span = r.settings.Tracer.StartSpanCtx(ctx, spanName, spanOptions...)
ctx, span = r.ac.Tracer.StartSpanCtx(ctx, spanName, spanOptions...)
defer span.Finish()
if span.TraceID() != 0 {
if err := job.Update(ctx, nil /* txn */, func(txn *kv.Txn, md JobMetadata,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// still handle an unexpected split, so we make our own range cache and
// only populate it with one of our two splits.
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil,
func() int64 { return 2 << 10 }, s.Stopper(), s.Tracer().(*tracing.Tracer))
func() int64 { return 2 << 10 }, s.Stopper(), s.TracerI().(*tracing.Tracer))
addr, err := keys.Addr(key(0))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// However we log an event when forced to retry (in case we need to debug)
// slow requests or something, so we can inspect the trace in the test to
// determine if requests required the expected number of retries.
tr := s.Tracer().(*tracing.Tracer)
tr := s.TracerI().(*tracing.Tracer)
addCtx, getRec, cancel := tracing.ContextWithRecordingSpan(ctx, tr, "add")
defer cancel()
expectedSplitRetries := 0
Expand Down
37 changes: 19 additions & 18 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
// starting a TestServer, which creates a "real" node and employs a
// distributed sender server-side.

func startNoSplitMergeServer(t *testing.T) (serverutils.TestServerInterface, *kv.DB) {
func startNoSplitMergeServer(t *testing.T) (*server.TestServer, *kv.DB) {
s, _, db := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
Expand All @@ -57,7 +57,7 @@ func startNoSplitMergeServer(t *testing.T) (serverutils.TestServerInterface, *kv
},
},
})
return s, db
return s.(*server.TestServer), db
}

// TestRangeLookupWithOpenTransaction verifies that range lookups are
Expand All @@ -76,22 +76,22 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
now := s.Clock().Now()
txn := roachpb.MakeTransaction("txn", roachpb.Key("foobar"), 0, now, 0)
if err := storage.MVCCPutProto(
context.Background(), s.(*server.TestServer).Engines()[0],
context.Background(), s.Engines()[0],
nil, key, now, &txn, &roachpb.RangeDescriptor{}); err != nil {
t.Fatal(err)
}

// Create a new DistSender and client.DB so that the Get below is guaranteed
// to not hit in the range descriptor cache forcing a RangeLookup operation.
ambient := log.AmbientContext{Tracer: s.ClusterSettings().Tracer}
ambient := log.AmbientContext{Tracer: s.TracerI().(*tracing.Tracer)}
ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: ambient,
Settings: cluster.MakeTestingClusterSettings(),
Clock: s.Clock(),
NodeDescs: s.(*server.TestServer).Gossip(),
NodeDescs: s.Gossip(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
FirstRangeProvider: s.(*server.TestServer).Gossip(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.Gossip())),
FirstRangeProvider: s.Gossip(),
})
tsf := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
Expand Down Expand Up @@ -965,13 +965,13 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) {
manual := hlc.NewManualClock(ts[0].WallTime + 1)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer},
AmbientCtx: log.AmbientContext{Tracer: s.TracerI().(*tracing.Tracer)},
Settings: cluster.MakeTestingClusterSettings(),
Clock: clock,
NodeDescs: s.(*server.TestServer).Gossip(),
NodeDescs: s.Gossip(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
FirstRangeProvider: s.(*server.TestServer).Gossip(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.Gossip())),
FirstRangeProvider: s.Gossip(),
})

reply, err := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{
Expand Down Expand Up @@ -1174,13 +1174,13 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) {
// Now, split further at the given keys, but use a new dist sender so
// we don't update the caches on the default dist sender-backed client.
ds := kvcoord.NewDistSender(kvcoord.DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: s.ClusterSettings().Tracer},
AmbientCtx: log.AmbientContext{Tracer: s.TracerI().(*tracing.Tracer)},
Clock: s.Clock(),
NodeDescs: s.(*server.TestServer).Gossip(),
NodeDescs: s.Gossip(),
RPCContext: s.RPCContext(),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.(*server.TestServer).Gossip())),
NodeDialer: nodedialer.New(s.RPCContext(), gossip.AddressResolver(s.Gossip())),
Settings: cluster.MakeTestingClusterSettings(),
FirstRangeProvider: s.(*server.TestServer).Gossip(),
FirstRangeProvider: s.Gossip(),
})
for _, key := range []string{"c"} {
req := &roachpb.AdminSplitRequest{
Expand Down Expand Up @@ -2865,10 +2865,11 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
s, _, db := serverutils.StartServer(t,
si, _, db := serverutils.StartServer(t,
base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}})
ctx := context.Background()
defer s.Stopper().Stop(ctx)
defer si.Stopper().Stop(ctx)
s := si.(*server.TestServer)

keyA, keyA1, keyB, keyB1 := roachpb.Key("a"), roachpb.Key("a1"), roachpb.Key("b"), roachpb.Key("b1")
require.NoError(t, setupMultipleRanges(ctx, db, string(keyB)))
Expand Down Expand Up @@ -2940,7 +2941,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
})

require.Regexp(t, "injected", txn.CommitInBatch(ctx, b))
tr := s.Tracer().(*tracing.Tracer)
tr := s.Tracer()
err = kvclientutils.CheckPushResult(
ctx, db, tr, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation)
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4388,10 +4388,11 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
getRangeDescCacheSize := func() int64 {
return 1 << 20
}
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper, st.Tracer)
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper, tr)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Expand Down Expand Up @@ -4432,7 +4433,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
AmbientCtx: log.AmbientContext{Tracer: tr},
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/local_test_cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func InitFactoryForLocalTestCluster(
) kv.TxnSenderFactory {
return NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
AmbientCtx: log.AmbientContext{Tracer: tracer},
Settings: st,
Clock: clock,
Stopper: stopper,
Expand All @@ -83,7 +83,7 @@ func NewDistSenderForLocalTestCluster(
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
senderTransportFactory := SenderTransportFactory(tracer, stores)
return NewDistSender(DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
AmbientCtx: log.AmbientContext{Tracer: tracer},
Settings: st,
Clock: clock,
NodeDescs: g,
Expand Down
27 changes: 18 additions & 9 deletions pkg/kv/kvclient/rangecache/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func staticSize(size int64) func() int64 {

func initTestDescriptorDB(t *testing.T) *testDescriptorDB {
st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
db := newTestDescriptorDB()
for i, char := range "abcdefghijklmnopqrstuvwx" {
// Create splits on each character:
Expand All @@ -258,7 +259,7 @@ func initTestDescriptorDB(t *testing.T) *testDescriptorDB {
}
// TODO(andrei): don't leak this Stopper. Someone needs to Stop() it.
db.stopper = stop.NewStopper()
db.cache = NewRangeCache(st, db, staticSize(2<<10), db.stopper, st.Tracer)
db.cache = NewRangeCache(st, db, staticSize(2<<10), db.stopper, tr)
return db
}

Expand Down Expand Up @@ -481,9 +482,10 @@ func TestLookupByKeyMin(t *testing.T) {
ctx := context.Background()

st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)
startToMeta2Desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: keys.RangeMetaKey(roachpb.RKey("a")),
Expand Down Expand Up @@ -1009,9 +1011,10 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
}

st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &CacheEntry{desc: *defDesc})

// Now, add a new, overlapping set of descriptors.
Expand Down Expand Up @@ -1185,9 +1188,10 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) {
for _, tc := range testCases {
t.Run("", func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil /* db */, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil /* db */, staticSize(2<<10), stopper, tr)
for _, d := range tc.cachedDescs {
cache.Insert(ctx, roachpb.RangeInfo{Desc: d})
}
Expand Down Expand Up @@ -1237,9 +1241,10 @@ func TestRangeCacheClearOverlappingMeta(t *testing.T) {
}

st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)
cache.Insert(ctx,
roachpb.RangeInfo{Desc: firstDesc},
roachpb.RangeInfo{Desc: restDesc})
Expand Down Expand Up @@ -1275,9 +1280,10 @@ func TestGetCachedRangeDescriptorInverted(t *testing.T) {
}

st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)
for _, rd := range testData {
cache.Insert(ctx, roachpb.RangeInfo{
Desc: rd,
Expand Down Expand Up @@ -1413,9 +1419,10 @@ func TestRangeCacheGeneration(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)
cache.Insert(ctx, roachpb.RangeInfo{Desc: *descAM2}, roachpb.RangeInfo{Desc: *descMZ4})
cache.Insert(ctx, roachpb.RangeInfo{Desc: *tc.insertDesc})

Expand Down Expand Up @@ -1479,9 +1486,10 @@ func TestRangeCacheEvictAndReplace(t *testing.T) {
startKey := desc1.StartKey

st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)

ri := roachpb.RangeInfo{Desc: desc1}
cache.Insert(ctx, ri)
Expand Down Expand Up @@ -1590,9 +1598,10 @@ func TestRangeCacheUpdateLease(t *testing.T) {
startKey := desc1.StartKey

st := cluster.MakeTestingClusterSettings()
tr := tracing.NewTracer()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, st.Tracer)
cache := NewRangeCache(st, nil, staticSize(2<<10), stopper, tr)

cache.Insert(ctx, roachpb.RangeInfo{
Desc: desc1,
Expand Down
Loading

0 comments on commit 9208567

Please sign in to comment.