Skip to content

Commit

Permalink
Merge #85769 #85931
Browse files Browse the repository at this point in the history
85769: sql, server: regulate access to remaining observability features r=Santamaura a=Santamaura

This change will control access to various observability
features based on system privileges including the following:
- admin ui databases/tables/schema endpoints requires admin or VIEWACTIVITY
- EXPERIMENTAL_AUDIT requires admin or MODIFYCLUSTERSETTING
- sql login requires not having NOSQLLOGIN or the equivalent
role option

Resolves: #83848, #83863, #83862

Release note (security update): Change requirements to access some
observability features. Databases/tables/schema endpoints for
admin ui require admin or VIEWACTIVITY. EXPERIMENTAL_AUDIT
requires admin or MODIFYCLUSTERSETTING. SQL login requires not
having NOSQLLOGIN or the equivalent role option.

85931: ccl/sqlproxyccl: ensure that connections cannot be transferred before initialization r=JeffSwenson a=jaylim-crl

Related to #80446.

In #80446, we updated the connection tracker to track server assignments
instead of forwarders. This also meant that there is a possibility where we
can start transferring the connection before we even resumed the forwarder
for the first time, breaking the TransferConnection invariant where the
processors must be resumed before being called.

This commit fixes that issue by introducing a new isInitialized flag to the
forwarder, which will only get set to true once run returns. Attempting to
transfer a connection with isInitialized=false will return an error. This
should fix flakes that we've been seeing on CI.

Release note: None

Release justification: sqlproxy bug fix. This ensures that we don't resume
the processors mid connection transfer, causing unexpected issues on the
client's end. Note that this situation is rare since it involves ensuring
timely behavior of forwarder.Run and forwarder.TransferConnection at the same
time.

Co-authored-by: Santamaura <[email protected]>
Co-authored-by: Jay <[email protected]>
  • Loading branch information
3 people committed Aug 11, 2022
3 parents 8e3ee57 + 9e5c7a0 + ae851fa commit 5845f4b
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 57 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/sqlproxyccl/conn_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (f *forwarder) tryBeginTransfer() (started bool, cleanupFn func()) {
defer f.mu.Unlock()

// Forwarder hasn't been initialized.
if !f.isInitializedLocked() {
if !f.mu.isInitialized {
return false, nil
}

Expand Down Expand Up @@ -120,9 +120,9 @@ var errTransferCannotStart = errors.New("transfer cannot be started")
// where the forwarder is not in a state that is eligible for a connection
// migration.
//
// NOTE: If the forwarder hasn't been closed, runTransfer has an invariant
// NOTE: If the forwarder hasn't been closed, TransferConnection has an invariant
// where the processors have been resumed prior to calling this method. When
// runTransfer returns, it is guaranteed that processors will either be
// TransferConnection returns, it is guaranteed that processors will either be
// re-resumed, or the forwarder will be closed (in the case of a non-recoverable
// error).
//
Expand All @@ -145,7 +145,7 @@ func (f *forwarder) TransferConnection() (retErr error) {
// Create a transfer context, and timeout handler which gets triggered
// whenever the context expires. We have to close the forwarder because
// the transfer may be blocked on I/O, and the only way for now is to close
// the connections. This then allow runTransfer to return and cleanup.
// the connections. This then allow TransferConnection to return and cleanup.
ctx, cancel := newTransferContext(f.ctx)
defer cancel()

Expand Down Expand Up @@ -177,8 +177,8 @@ func (f *forwarder) TransferConnection() (retErr error) {
latencyDur := timeutil.Since(tBegin)
f.metrics.ConnMigrationAttemptedLatency.RecordValue(latencyDur.Nanoseconds())

// When runTransfer returns, it's either the forwarder has been closed,
// or the procesors have been resumed.
// When TransferConnection returns, it's either the forwarder has been
// closed, or the procesors have been resumed.
if !ctx.isRecoverable() {
log.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr)
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/sqlproxyccl/conn_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestForwarder_tryBeginTransfer(t *testing.T) {
f := &forwarder{}
f.mu.request = &processor{}
f.mu.response = &processor{}
f.mu.isInitialized = true

started, cleanupFn := f.tryBeginTransfer()
require.False(t, started)
Expand All @@ -107,6 +108,7 @@ func TestForwarder_tryBeginTransfer(t *testing.T) {
f := &forwarder{}
f.mu.request = &processor{}
f.mu.response = &processor{}
f.mu.isInitialized = true

started, cleanupFn := f.tryBeginTransfer()
require.True(t, started)
Expand Down
45 changes: 32 additions & 13 deletions pkg/ccl/sqlproxyccl/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ type forwarder struct {
mu struct {
syncutil.Mutex

// isInitialized indicates that the forwarder has been initialized.
//
// TODO(jaylim-crl): This prevents the connection from being transferred
// before we fully resume the processors (because the balancer now
// tracks assignments instead of forwarders). If we don't do this, there
// could be a situation where we resume the processors mid transfer. One
// alternative idea is to replace both isInitialized and isTransferring
// with a lock, which is held by the owner of the forwarder (e.g. main
// thread, or connection migrator thread).
isInitialized bool

// isTransferring indicates that a connection migration is in progress.
isTransferring bool

Expand Down Expand Up @@ -154,7 +165,7 @@ func newForwarder(
//
// run can only be called once throughout the lifetime of the forwarder.
func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error {
initialize := func() error {
setup := func() error {
f.mu.Lock()
defer f.mu.Unlock()

Expand All @@ -165,8 +176,9 @@ func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error {
return f.ctx.Err()
}

// Run can only be called once.
if f.isInitializedLocked() {
// Run can only be called once. If lastUpdated has already been set
// (i.e. non-zero), it has to be the case where run has been called.
if !f.mu.activity.lastUpdated.IsZero() {
return errors.AssertionFailedf("forwarder has already been started")
}

Expand All @@ -185,10 +197,23 @@ func (f *forwarder) run(clientConn net.Conn, serverConn net.Conn) error {
f.mu.activity.lastUpdated = f.timeSource.Now()
return nil
}
if err := initialize(); err != nil {
return err
markInitialized := func() {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.isInitialized = true
}

if err := setup(); err != nil {
return errors.Wrap(err, "setting up forwarder")
}
return f.resumeProcessors()

if err := f.resumeProcessors(); err != nil {
return errors.Wrap(err, "resuming processors")
}

// Mark the forwarder as initialized, and connection is ready for a transfer.
markInitialized()
return nil
}

// Context returns the context associated with the forwarder.
Expand Down Expand Up @@ -237,7 +262,7 @@ func (f *forwarder) IsIdle() (idle bool) {
defer f.mu.Unlock()

// If the forwarder hasn't been initialized, it is considered active.
if !f.isInitializedLocked() {
if !f.mu.isInitialized {
return false
}

Expand Down Expand Up @@ -270,12 +295,6 @@ func (f *forwarder) IsIdle() (idle bool) {
return now.Sub(f.mu.activity.lastUpdated) >= idleTimeout
}

// isInitializedLocked returns true if the forwarder has been initialized
// through Run, or false otherwise.
func (f *forwarder) isInitializedLocked() bool {
return f.mu.request != nil && f.mu.response != nil
}

// resumeProcessors starts both the request and response processors
// asynchronously. The forwarder will be closed if any of the processors
// return an error while resuming. This is idempotent as resume() will return
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/sqlproxyccl/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func TestForward(t *testing.T) {
err := f.run(p1, p2)
require.NoError(t, err)

func() {
f.mu.Lock()
defer f.mu.Unlock()
require.True(t, f.mu.isInitialized)
}()

// Close the connection right away to simulate processor error.
p1.Close()

Expand Down Expand Up @@ -77,6 +83,11 @@ func TestForward(t *testing.T) {
require.NoError(t, err)
require.Nil(t, f.ctx.Err())
require.False(t, f.IsIdle())
func() {
f.mu.Lock()
defer f.mu.Unlock()
require.True(t, f.mu.isInitialized)
}()

f.mu.Lock()
requestProc := f.mu.request
Expand Down Expand Up @@ -217,6 +228,11 @@ func TestForward(t *testing.T) {
require.NoError(t, err)
require.Nil(t, f.ctx.Err())
require.False(t, f.IsIdle())
func() {
f.mu.Lock()
defer f.mu.Unlock()
require.True(t, f.mu.isInitialized)
}()

f.mu.Lock()
responseProc := f.mu.response
Expand Down
23 changes: 2 additions & 21 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,6 @@ func TestConnectionRebalancingDisabled(t *testing.T) {
const podCount = 2
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, base.TestingKnobs{})
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register one SQL pod in the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
Expand Down Expand Up @@ -934,11 +929,6 @@ func TestCancelQuery(t *testing.T) {
},
}
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, tenantKnobs)
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Use a custom time source for testing.
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -1272,11 +1262,6 @@ func TestPodWatcher(t *testing.T) {
const podCount = 4
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount, base.TestingKnobs{})
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register only 3 SQL pods in the directory server. We will add the 4th
// once the watcher has been established.
Expand Down Expand Up @@ -1739,11 +1724,6 @@ func TestCurConnCountMetric(t *testing.T) {
// Start a single SQL pod.
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, 1, base.TestingKnobs{})
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register the SQL pod in the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
Expand Down Expand Up @@ -2295,7 +2275,8 @@ func queryAddr(ctx context.Context, t *testing.T, db queryer) string {

// startTestTenantPods starts count SQL pods for the given tenant, and returns
// a list of tenant servers. Note that a default admin testuser with the
// password hunter2 will be created.
// password hunter2 will be created. The test tenants will automatically be
// stopped once the server's stopper (from ts) is stopped.
func startTestTenantPods(
ctx context.Context,
t *testing.T,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (d *TestStaticDirectoryServer) RemovePod(tenantID roachpb.TenantID, podAddr
}

// Start starts the test directory server using an in-memory listener. This
// returns an error if the server cannot be started. If the sevrer has already
// returns an error if the server cannot be started. If the server has already
// been started, this is a no-op.
func (d *TestStaticDirectoryServer) Start(ctx context.Context) error {
d.process.Lock()
Expand Down
22 changes: 20 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ func (s *adminServer) Databases(
return nil, serverError(ctx, err)
}

if err := s.requireViewActivityPermission(ctx); err != nil {
return nil, err
}

r, err := s.databasesHelper(ctx, req, sessionUser, 0, 0)
return r, maybeHandleNotFoundError(ctx, err)
}
Expand Down Expand Up @@ -315,6 +319,10 @@ func (s *adminServer) DatabaseDetails(
return nil, serverError(ctx, err)
}

if err := s.requireViewActivityPermission(ctx); err != nil {
return nil, err
}

r, err := s.databaseDetailsHelper(ctx, req, userName)
return r, maybeHandleNotFoundError(ctx, err)
}
Expand Down Expand Up @@ -679,6 +687,10 @@ func (s *adminServer) TableDetails(
return nil, serverError(ctx, err)
}

if err := s.requireViewActivityPermission(ctx); err != nil {
return nil, err
}

r, err := s.tableDetailsHelper(ctx, req, userName)
return r, maybeHandleNotFoundError(ctx, err)
}
Expand Down Expand Up @@ -1075,7 +1087,13 @@ func (s *adminServer) TableStats(
ctx context.Context, req *serverpb.TableStatsRequest,
) (*serverpb.TableStatsResponse, error) {
ctx = s.server.AnnotateCtx(ctx)
userName, err := s.requireAdminUser(ctx)

userName, err := userFromContext(ctx)
if err != nil {
return nil, serverError(ctx, err)
}

err = s.requireViewActivityPermission(ctx)
if err != nil {
// NB: not using serverError() here since the priv checker
// already returns a proper gRPC error status.
Expand Down Expand Up @@ -1105,7 +1123,7 @@ func (s *adminServer) NonTableStats(
ctx context.Context, req *serverpb.NonTableStatsRequest,
) (*serverpb.NonTableStatsResponse, error) {
ctx = s.server.AnnotateCtx(ctx)
if _, err := s.requireAdminUser(ctx); err != nil {
if err := s.requireViewActivityPermission(ctx); err != nil {
// NB: not using serverError() here since the priv checker
// already returns a proper gRPC error status.
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ func TestAdminAPIDatabases(t *testing.T) {
if _, err := db.Exec(query); err != nil {
t.Fatal(err)
}
// Non admins now also require VIEWACTIVITY.
query = fmt.Sprintf(
"GRANT SYSTEM %s TO %s",
"VIEWACTIVITY",
authenticatedUserNameNoAdmin().SQLIdentifier(),
)
if _, err := db.Exec(query); err != nil {
t.Fatal(err)
}

for _, tc := range []struct {
expectedDBs []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/index_usage_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (s *statusServer) TableIndexStats(
ctx = propagateGatewayMetadata(ctx)
ctx = s.AnnotateCtx(ctx)

if err := s.privilegeChecker.requireViewActivityOrViewActivityRedactedPermission(ctx); err != nil {
if err := s.privilegeChecker.requireViewActivityPermission(ctx); err != nil {
return nil, err
}
return getTableIndexUsageStats(ctx, req, s.sqlServer.pgServer.SQLServer.GetLocalIndexStatistics(),
Expand Down
25 changes: 23 additions & 2 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/volatility"
Expand All @@ -44,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/storageparam"
"github.com/cockroachdb/cockroach/pkg/sql/storageparam/tablestorageparam"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -908,10 +911,28 @@ func (p *planner) setAuditMode(
p.curPlan.auditEvents = append(p.curPlan.auditEvents,
auditEvent{desc: desc, writing: true})

// We require root for now. Later maybe use a different permission?
if err := p.RequireAdminRole(ctx, "change auditing settings on a table"); err != nil {
// Requires admin or MODIFYCLUSTERSETTING as of 22.2
hasAdmin, err := p.HasAdminRole(ctx)
if err != nil {
return false, err
}
if !hasAdmin {
// Check for system privilege first, otherwise fall back to role options.
hasModify := false
if p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.SystemPrivilegesTable) {
hasModify = p.CheckPrivilege(ctx, syntheticprivilege.GlobalPrivilegeObject, privilege.MODIFYCLUSTERSETTING) == nil
}
if !hasModify {
hasModify, err = p.HasRoleOption(ctx, roleoption.MODIFYCLUSTERSETTING)
if err != nil {
return false, err
}
if !hasModify {
return false, pgerror.Newf(pgcode.InsufficientPrivilege,
"only users with admin or %s system privilege are allowed to change audit settings on a table ", privilege.MODIFYCLUSTERSETTING.String())
}
}
}

telemetry.Inc(sqltelemetry.SchemaSetAuditModeCounter(auditMode.TelemetryName()))

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ statement ok
ALTER TABLE audit ADD COLUMN y INT

# But not the audit settings.
statement error change auditing settings on a table
statement error pq: only users with admin or MODIFYCLUSTERSETTING system privilege are allowed to change audit settings on a table
ALTER TABLE audit EXPERIMENTAL_AUDIT SET OFF

user root
Expand Down
Loading

0 comments on commit 5845f4b

Please sign in to comment.