Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
79682: util/mon: add monitor address to logging messages r=yuzefovich a=yuzefovich

This will allow us to uniquely identify all messages coming from
a single monitor. Additionally, this commit fixes the redactability of
the file name and function name when getting a stack trace.

Release note: None

83959: sql: ensure revalidate_unique_constraint* builtins respect privileges r=rytaft a=rytaft

This commit updates the builtins
`crdb_internal.revalidate_unique_constraints_in_all_tables`,
`crdb_internal.revalidate_unique_constraints_in_table`,
and `crdb_internal.revalidate_unique_constraint` to ensure that the correct
user is passed to the internal executor when running the validation query.
This ensures that privileges will be respected.

Release note (bug fix): Fixed the following builtins so that users can only
run them if they have `SELECT` privileges on the relevant tables:
`crdb_internal.revalidate_unique_constraints_in_all_tables`,
`crdb_internal.revalidate_unique_constraints_in_table`,
and `crdb_internal.revalidate_unique_constraint`.

84099: kvserver: Correct disablingClientStream behavior r=ajwerner a=andrewbaptist

Change disablingClientStream to buffer messages instead of dropping
them when it is in the disabled state. Once it is no longer disabled
send all the outstanding messages. Resolves a flake: #84041

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
  • Loading branch information
4 people committed Jul 8, 2022
4 parents 796a675 + 1fd84d7 + d2715d2 + 7405c56 commit cc22360
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 24 deletions.
47 changes: 47 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/multi_region_privileges
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,50 @@ user testuser
statement ok
ALTER DATABASE repartition_privs ADD REGION "ap-southeast-2";
ALTER DATABASE repartition_privs DROP REGION "us-east-1"

subtest revalidate_privs

user root

statement ok
CREATE DATABASE revalidate_privs PRIMARY REGION "ca-central-1" REGIONS "us-east-1"

statement ok
CREATE TABLE revalidate_privs.rbr () LOCALITY REGIONAL BY ROW

user testuser

statement ok
USE revalidate_privs

# Check that revalidate_unique_constraint* builtins respect privileges.
query error user testuser does not have SELECT privilege on relation rbr
SELECT crdb_internal.revalidate_unique_constraints_in_all_tables()

query error user testuser does not have SELECT privilege on relation rbr
SELECT crdb_internal.revalidate_unique_constraints_in_table('revalidate_privs.rbr')

query error user testuser does not have SELECT privilege on relation rbr
SELECT crdb_internal.revalidate_unique_constraint('revalidate_privs.rbr', 'rbr_pkey')

user root

statement ok
GRANT SELECT ON revalidate_privs.rbr TO testuser

user testuser

query T
SELECT crdb_internal.revalidate_unique_constraints_in_all_tables()
----
·

query T
SELECT crdb_internal.revalidate_unique_constraints_in_table('repartition_privs.rbr')
----
·

query T
SELECT crdb_internal.revalidate_unique_constraint('repartition_privs.rbr', 'rbr_pkey')
----
·
25 changes: 24 additions & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4662,15 +4662,38 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) {
wg.Wait()
}

// disablingClientStream allows delaying rRPC messages based on a user provided
// function. It is OK to arbitrarily delay messages, but if they are dropped, it
// breaks application level expectations of in-order delivery. By returning nil
// immediately, and then sending when the stream is not disabled, we don't break
// the SendMsg contract. Note that this test is still a little too high level in
// the sense that it is blocking at the gRPC layer, and not the TCP layer, but
// that is much more complex to fully implement, and this should get equivalent
// results.
type disablingClientStream struct {
grpc.ClientStream
disabled func() bool
wasDisabled bool
buffer []interface{}
disabled func() bool
}

func (cs *disablingClientStream) SendMsg(m interface{}) error {
// When the stream is disabled, buffer all the messages, but don't send.
if cs.disabled() {
cs.buffer = append(cs.buffer, m)
cs.wasDisabled = true
return nil
}
// Now that it transitioned from disabled to not disabled, flush all the
// messages out in the same order as originally expected.
if cs.wasDisabled {
for _, buf := range cs.buffer {
_ = cs.ClientStream.SendMsg(buf)
}
cs.buffer = nil
cs.wasDisabled = false
}

return cs.ClientStream.SendMsg(m)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func TestReadEnvironmentVariables(t *testing.T) {
cfg.AmbientCtx.Tracer = nil
cfgExpected.Tracer = nil
cfgExpected.AmbientCtx.Tracer = nil
// Temp storage disk monitors will have slightly different names, so we
// override them to point to the same one.
cfgExpected.TempStorageConfig.Mon = cfg.TempStorageConfig.Mon
require.Equal(t, cfgExpected, cfg)

// Set all the environment variables to valid values and ensure they are set
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (n *alterTableNode) startExec(params runParams) error {
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.ExecCfg().InternalExecutorFactory(
params.ctx, params.SessionData(),
), n.tableDesc, params.p.Txn(), name,
), n.tableDesc, params.p.Txn(), params.p.User(), name,
); err != nil {
return err
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -763,7 +764,9 @@ func (sc *SchemaChanger) validateConstraints(
return err
}
} else if c.IsUniqueWithoutIndex() {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, c.GetName()); err != nil {
if err := validateUniqueWithoutIndexConstraintInTxn(
ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, evalCtx.SessionData().User(), c.GetName(),
); err != nil {
return err
}
} else if c.IsNotNull() {
Expand Down Expand Up @@ -1941,6 +1944,7 @@ func countIndexRowsAndMaybeCheckUniqueness(
idx.GetPredicate(),
ie,
txn,
username.NodeUserName(),
false, /* preExisting */
); err != nil {
return err
Expand Down Expand Up @@ -2416,7 +2420,7 @@ func runSchemaChangesInTxn(
uwi := &c.ConstraintToUpdateDesc().UniqueWithoutIndexConstraint
if uwi.Validity == descpb.ConstraintValidity_Validating {
if err := validateUniqueWithoutIndexConstraintInTxn(
ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, c.GetName(),
ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, planner.User(), c.GetName(),
); err != nil {
return err
}
Expand Down Expand Up @@ -2575,6 +2579,7 @@ func validateUniqueWithoutIndexConstraintInTxn(
ie sqlutil.InternalExecutor,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
user username.SQLUsername,
constraintName string,
) error {
var syntheticDescs []catalog.Descriptor
Expand All @@ -2596,7 +2601,15 @@ func validateUniqueWithoutIndexConstraintInTxn(

return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateUniqueConstraint(
ctx, tableDesc, uc.Name, uc.ColumnIDs, uc.Predicate, ie, txn, false, /* preExisting */
ctx,
tableDesc,
uc.Name,
uc.ColumnIDs,
uc.Predicate,
ie,
txn,
user,
false, /* preExisting */
)
})
}
Expand Down
25 changes: 20 additions & 5 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -381,7 +382,9 @@ func (p *planner) RevalidateUniqueConstraintsInCurrentDB(ctx context.Context) er
}

for _, tableDesc := range tableDescs {
if err = RevalidateUniqueConstraintsInTable(ctx, p.Txn(), p.ExecCfg().InternalExecutor, tableDesc); err != nil {
if err = RevalidateUniqueConstraintsInTable(
ctx, p.Txn(), p.User(), p.ExecCfg().InternalExecutor, tableDesc,
); err != nil {
return err
}
}
Expand All @@ -402,7 +405,9 @@ func (p *planner) RevalidateUniqueConstraintsInTable(ctx context.Context, tableI
if err != nil {
return err
}
return RevalidateUniqueConstraintsInTable(ctx, p.Txn(), p.ExecCfg().InternalExecutor, tableDesc)
return RevalidateUniqueConstraintsInTable(
ctx, p.Txn(), p.User(), p.ExecCfg().InternalExecutor, tableDesc,
)
}

// RevalidateUniqueConstraint verifies that the given unique constraint on the
Expand Down Expand Up @@ -438,6 +443,7 @@ func (p *planner) RevalidateUniqueConstraint(
index.GetPredicate(),
p.ExecCfg().InternalExecutor,
p.Txn(),
p.User(),
true, /* preExisting */
)
}
Expand All @@ -457,6 +463,7 @@ func (p *planner) RevalidateUniqueConstraint(
uc.Predicate,
p.ExecCfg().InternalExecutor,
p.Txn(),
p.User(),
true, /* preExisting */
)
}
Expand Down Expand Up @@ -531,7 +538,11 @@ func HasVirtualUniqueConstraints(tableDesc catalog.TableDescriptor) bool {
// enforced by an index. This includes implicitly partitioned UNIQUE indexes
// and UNIQUE WITHOUT INDEX constraints.
func RevalidateUniqueConstraintsInTable(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, tableDesc catalog.TableDescriptor,
ctx context.Context,
txn *kv.Txn,
user username.SQLUsername,
ie sqlutil.InternalExecutor,
tableDesc catalog.TableDescriptor,
) error {
// Check implicitly partitioned UNIQUE indexes.
for _, index := range tableDesc.ActiveIndexes() {
Expand All @@ -544,6 +555,7 @@ func RevalidateUniqueConstraintsInTable(
index.GetPredicate(),
ie,
txn,
user,
true, /* preExisting */
); err != nil {
log.Errorf(ctx, "validation of unique constraints failed for table %s: %s", tableDesc.GetName(), err)
Expand All @@ -563,6 +575,7 @@ func RevalidateUniqueConstraintsInTable(
uc.Predicate,
ie,
txn,
user,
true, /* preExisting */
); err != nil {
log.Errorf(ctx, "validation of unique constraints failed for table %s: %s", tableDesc.GetName(), err)
Expand Down Expand Up @@ -591,6 +604,7 @@ func validateUniqueConstraint(
pred string,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
user username.SQLUsername,
preExisting bool,
) error {
query, colNames, err := duplicateRowQuery(
Expand All @@ -607,8 +621,9 @@ func validateUniqueConstraint(
query,
)

values, err := ie.QueryRowEx(ctx, "validate unique constraint", txn,
sessiondata.NodeUserSessionDataOverride, query)
sessionDataOverride := sessiondata.NoSessionDataOverride
sessionDataOverride.User = user
values, err := ie.QueryRowEx(ctx, "validate unique constraint", txn, sessionDataOverride, query)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}

if err := r.checkVirtualConstraints(ctx, p.ExecCfg(), r.job); err != nil {
if err := r.checkVirtualConstraints(ctx, p.ExecCfg(), r.job, p.User()); err != nil {
return err
}

Expand Down Expand Up @@ -1103,8 +1103,8 @@ func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.Executo

// checkVirtualConstraints checks constraints that are enforced via runtime
// checks, such as uniqueness checks that are not directly backed by an index.
func (*importResumer) checkVirtualConstraints(
ctx context.Context, execCfg *sql.ExecutorConfig, job *jobs.Job,
func (r *importResumer) checkVirtualConstraints(
ctx context.Context, execCfg *sql.ExecutorConfig, job *jobs.Job, user username.SQLUsername,
) error {
for _, tbl := range job.Details().(jobspb.ImportDetails).Tables {
desc := tabledesc.NewBuilder(tbl.Desc).BuildExistingMutableTable()
Expand All @@ -1121,7 +1121,7 @@ func (*importResumer) checkVirtualConstraints(
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := execCfg.InternalExecutorFactory(ctx, sql.NewFakeSessionData(execCfg.SV()))
return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
return sql.RevalidateUniqueConstraintsInTable(ctx, txn, ie, desc)
return sql.RevalidateUniqueConstraintsInTable(ctx, txn, user, ie, desc)
})
}); err != nil {
return err
Expand Down
22 changes: 14 additions & 8 deletions pkg/util/mon/bytes_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ type BytesMonitor struct {

// name identifies this monitor in logging messages.
name redact.RedactableString
// nameWithPointer contains name with the address of the monitor attached to
// it. This can be used in logging messages to uniquely identify all
// messages for a single monitor.
nameWithPointer redact.RedactableString

// resource specifies what kind of resource the monitor is tracking
// allocations for. Specific behavior is delegated to this resource (e.g.
Expand Down Expand Up @@ -307,6 +311,7 @@ func NewMonitorWithLimit(
poolAllocationSize: increment,
settings: settings,
}
m.nameWithPointer = redact.Sprintf("%s (%p)", name, redact.Safe(m))
m.mu.curBytesCount = curCount
m.mu.maxBytesHist = maxHist
return m
Expand Down Expand Up @@ -365,10 +370,10 @@ func (mm *BytesMonitor) Start(ctx context.Context, pool *BytesMonitor, reserved
if log.V(2) {
poolname := redact.RedactableString("(none)")
if pool != nil {
poolname = pool.name
poolname = pool.nameWithPointer
}
log.InfofDepth(ctx, 1, "%s: starting monitor, reserved %s, pool %s",
mm.name,
mm.nameWithPointer,
humanizeutil.IBytes(mm.reserved.used),
poolname)
}
Expand Down Expand Up @@ -398,6 +403,7 @@ func NewUnlimitedMonitor(
reserved: NewStandaloneBudget(math.MaxInt64),
settings: settings,
}
m.nameWithPointer = redact.Sprintf("%s (%p)", name, redact.Safe(m))
m.mu.curBytesCount = curCount
m.mu.maxBytesHist = maxHist
return m
Expand Down Expand Up @@ -427,7 +433,7 @@ func (mm *BytesMonitor) doStop(ctx context.Context, check bool) {
// monitor is not shared any more.
if log.V(1) && mm.mu.maxAllocated >= bytesMaxUsageLoggingThreshold {
log.InfofDepth(ctx, 1, "%s, bytes usage max %s",
mm.name,
mm.nameWithPointer,
humanizeutil.IBytes(mm.mu.maxAllocated))
}

Expand Down Expand Up @@ -768,7 +774,7 @@ func (mm *BytesMonitor) reserveBytes(ctx context.Context, x int64) error {
// many small allocations.
if bits.Len64(uint64(mm.mu.curAllocated)) != bits.Len64(uint64(mm.mu.curAllocated-x)) {
log.Infof(ctx, "%s: bytes usage increases to %s (+%d)",
mm.name,
mm.nameWithPointer,
humanizeutil.IBytes(mm.mu.curAllocated), x)
}
}
Expand All @@ -778,7 +784,7 @@ func (mm *BytesMonitor) reserveBytes(ctx context.Context, x int64) error {
// We avoid VEventf here because we want to avoid computing the
// trace string if there is nothing to log.
log.Infof(ctx, "%s: now at %d bytes (+%d) - %s",
mm.name, mm.mu.curAllocated, x, util.GetSmallTrace(3))
mm.nameWithPointer, mm.mu.curAllocated, x, util.GetSmallTrace(3))
}
return nil
}
Expand All @@ -804,7 +810,7 @@ func (mm *BytesMonitor) releaseBytes(ctx context.Context, sz int64) {
// We avoid VEventf here because we want to avoid computing the
// trace string if there is nothing to log.
log.Infof(ctx, "%s: now at %d bytes (-%d) - %s",
mm.name, mm.mu.curAllocated, sz, util.GetSmallTrace(5))
mm.nameWithPointer, mm.mu.curAllocated, sz, util.GetSmallTrace(5))
}
}

Expand All @@ -820,7 +826,7 @@ func (mm *BytesMonitor) increaseBudget(ctx context.Context, minExtra int64) erro
)
}
if log.V(2) {
log.Infof(ctx, "%s: requesting %d bytes from the pool", mm.name, minExtra)
log.Infof(ctx, "%s: requesting %d bytes from the pool", mm.nameWithPointer, minExtra)
}

return mm.mu.curBudget.Grow(ctx, minExtra)
Expand All @@ -844,7 +850,7 @@ func (mm *BytesMonitor) roundSize(sz int64) int64 {
func (mm *BytesMonitor) releaseBudget(ctx context.Context) {
// NB: mm.mu need not be locked here, as this is only called from StopMonitor().
if log.V(2) {
log.Infof(ctx, "%s: releasing %d bytes to the pool", mm.name, mm.mu.curBudget.allocated())
log.Infof(ctx, "%s: releasing %d bytes to the pool", mm.nameWithPointer, mm.mu.curBudget.allocated())
}
mm.mu.curBudget.Clear(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/smalltrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func GetSmallTrace(skip int) redact.RedactableString {
if index := strings.LastIndexByte(file, '/'); index >= 0 {
file = file[index+1:]
}
callers.Printf("%s%s:%d:%s", callerPrefix, file, f.Line, function)
callers.Printf("%s%s:%d:%s", callerPrefix, redact.SafeString(file), f.Line, redact.SafeString(function))
callerPrefix = ","
if !more {
break
Expand Down
Loading

0 comments on commit cc22360

Please sign in to comment.