Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117505: roachtest: assign adminui ports dynamically for virtual clusters r=srosenberg,renatolabs a=DarrylWong

This was originally removed in #115599 due to #114097 merging, but adminui was reverted in #117141 and mistakenly did not revert the special case for virtual clusters. We unskip the multitenant/distsql tests as well.

Release note: None
Fixes: #117150
Fixes: #117149
Epic: None

117545: rpc: rm rangefeed RPC stream window special case r=erikgrinaker,miretskiy a=pav-kv

The rangefeed stream window size tuning was introduced to mitigate OOM in rangefeeds caused by the excessive number of streams (one per `Range`). Since we now use mux rangefeeds (which multiplexes all the rangefeed traffic into a single stream), this setting is no longer needed, so this commit removes it.

Part of #108992

Release note (ops change): `COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE` env variable has been removed, and rangefeed connection now uses the same window size as other RPC connections.

117554: sqlproxyccl: improve authentication throttle error r=JeffSwenson a=JeffSwenson

The sql proxy will throttle connection attempts if a (client IP, tenant cluster) pair has too many authentication failures. The error is usually caused by a misconfigured password in a connection pool. This change replaces the "connection attempt throttled" error message with "too many failed authentication attempts". There is a hint that includes this message but not all drivers are configured to log hints.

Fixes #117552

Co-authored-by: DarrylWong <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Jeff <[email protected]>
  • Loading branch information
4 people committed Jan 10, 2024
4 parents 40247b5 + f6519af + d91c925 + 480882f commit 05d1395
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 28 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/sqlproxyccl/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestAuthenticateThrottled(t *testing.T) {
require.Equal(t, msg, &pgproto3.ErrorResponse{
Severity: "FATAL",
Code: "08C00",
Message: "codeProxyRefusedConnection: connection attempt throttled",
Message: "codeProxyRefusedConnection: too many failed authentication attempts",
Hint: throttledErrorHint,
})

Expand All @@ -142,10 +142,10 @@ func TestAuthenticateThrottled(t *testing.T) {
_, err := authenticate(proxyToClient, proxyToServer, nil, /* proxyBackendKeyData */
func(status throttler.AttemptStatus) error {
require.Equal(t, throttler.AttemptInvalidCredentials, status)
return throttledError
return authThrottledError
})
require.Error(t, err)
require.Contains(t, err.Error(), "connection attempt throttled")
require.Contains(t, err.Error(), "too many failed authentication attempts")

proxyToServer.Close()
proxyToClient.Close()
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ const throttledErrorHint string = `Connection throttling is triggered by repeate
sure the username and password are correct.
`

var throttledError = errors.WithHint(
var authThrottledError = errors.WithHint(
withCode(errors.New(
"connection attempt throttled"), codeProxyRefusedConnection),
"too many failed authentication attempts"), codeProxyRefusedConnection),
throttledErrorHint)

// newProxyHandler will create a new proxy handler with configuration based on
Expand Down Expand Up @@ -432,7 +432,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn)
throttleTime, err := handler.throttleService.LoginCheck(throttleTags)
if err != nil {
log.Errorf(ctx, "throttler refused connection: %v", err.Error())
err = throttledError
err = authThrottledError
updateMetricsAndSendErrToClient(err, fe.Conn, handler.metrics)
return err
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn)
ctx, throttleTags, throttleTime, status,
); err != nil {
log.Errorf(ctx, "throttler refused connection after authentication: %v", err.Error())
return throttledError
return authThrottledError
}
return nil
},
Expand Down
6 changes: 6 additions & 0 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2490,6 +2490,12 @@ func (c *clusterImpl) ExternalAdminUIAddr(
return c.adminUIAddr(ctx, l, node, true)
}

func (c *clusterImpl) AdminUIPorts(
ctx context.Context, l *logger.Logger, nodes option.NodeListOption,
) ([]int, error) {
return roachprod.AdminPorts(ctx, l, c.MakeNodes(nodes), c.IsSecure())
}

func (c *clusterImpl) adminUIAddr(
ctx context.Context, l *logger.Logger, node option.NodeListOption, external bool,
) ([]string, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ type Cluster interface {
Conn(ctx context.Context, l *logger.Logger, node int, opts ...func(*option.ConnOption)) *gosql.DB
ConnE(ctx context.Context, l *logger.Logger, node int, opts ...func(*option.ConnOption)) (*gosql.DB, error)

// URLs for the Admin UI.
// URLs and Ports for the Admin UI.

InternalAdminUIAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
ExternalAdminUIAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
AdminUIPorts(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]int, error)

// Running commands on nodes.

Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/option/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func DefaultStartVirtualClusterOpts(tenantName string, sqlInstance int) StartOpt
startOpts.RoachprodOpts.Target = install.StartServiceForVirtualCluster
startOpts.RoachprodOpts.VirtualClusterName = tenantName
startOpts.RoachprodOpts.SQLInstance = sqlInstance
// TODO(DarrylWong): remove once #117125 is addressed.
startOpts.RoachprodOpts.AdminUIPort = 0
return startOpts
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/cmd/roachtest/tests/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,18 @@ SELECT count(replicas)
t.L().Printf("killing all nodes\n")
c.Stop(ctx, t.L(), option.DefaultStopOpts())

adminPorts, err := c.AdminUIPorts(ctx, t.L(), c.Node(1))
if err != nil {
t.Fatal(err)
}

// Restart node 1, but have it listen on a different port for internal
// connections. This will require node 1 to reach out to the other nodes in
// the cluster for gossip info.
err := c.RunE(ctx, c.Node(1),
err = c.RunE(ctx, c.Node(1),
` ./cockroach start --insecure --background --store={store-dir} `+
`--log-dir={log-dir} --cache=10% --max-sql-memory=10% `+
`--listen-addr=:$[{pgport:1}+1000] --http-port=$[{pgport:1}+1] `+
fmt.Sprintf(`--listen-addr=:$[{pgport:1}+1000] --http-port=%d `, adminPorts[0])+
`--join={pghost:1}:{pgport:1} `+
`--advertise-addr={pghost:1}:$[{pgport:1}+1000] `+
`> {log-dir}/cockroach.stdout 2> {log-dir}/cockroach.stderr`)
Expand Down
2 changes: 0 additions & 2 deletions pkg/cmd/roachtest/tests/multitenant_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ func registerMultiTenantDistSQL(r registry.Registry) {
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
Leases: registry.MetamorphicLeases,
// TODO(#117150): unskip when #117150 is fixed.
Skip: "#117150",
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runMultiTenantDistSQL(ctx, t, c, numInstances, b == "on", to)
},
Expand Down
22 changes: 22 additions & 0 deletions pkg/roachprod/roachprod.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,28 @@ func AdminURL(
return urlGenerator(ctx, c, l, c.TargetNodes(), uConfig)
}

// AdminPorts finds the AdminUI ports for a cluster.
func AdminPorts(
ctx context.Context, l *logger.Logger, clusterName string, secure bool,
) ([]int, error) {
if err := LoadClusters(); err != nil {
return nil, err
}
c, err := newCluster(l, clusterName, install.SecureOption(secure))
if err != nil {
return nil, err
}
var ports []int
for _, node := range c.Nodes {
port, err := c.NodeUIPort(ctx, node)
if err != nil {
return nil, errors.Wrapf(err, "Error discovering UI Port for node %d", node)
}
ports = append(ports, port)
}
return ports, nil
}

// PprofOpts specifies the options needed by Pprof().
type PprofOpts struct {
Heap bool
Expand Down
10 changes: 4 additions & 6 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,12 +1785,10 @@ func (rpcCtx *Context) dialOptsCommon(
dialOpts = append(dialOpts, grpc.WithDisableRetry())

// Configure the window sizes with optional env var overrides.
dialOpts = append(dialOpts, grpc.WithInitialConnWindowSize(rpcCtx.initialConnWindowSize(ctx)))
if class == RangefeedClass {
dialOpts = append(dialOpts, grpc.WithInitialWindowSize(rpcCtx.rangefeedInitialWindowSize(ctx)))
} else {
dialOpts = append(dialOpts, grpc.WithInitialWindowSize(rpcCtx.initialWindowSize(ctx)))
}
dialOpts = append(dialOpts,
grpc.WithInitialConnWindowSize(rpcCtx.initialConnWindowSize(ctx)),
grpc.WithInitialWindowSize(rpcCtx.initialWindowSize(ctx)),
)
unaryInterceptors := rpcCtx.clientUnaryInterceptors
unaryInterceptors = unaryInterceptors[:len(unaryInterceptors):len(unaryInterceptors)]
if rpcCtx.Knobs.UnaryClientInterceptor != nil {
Expand Down
10 changes: 0 additions & 10 deletions pkg/rpc/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ type windowSizeSettings struct {
initialWindowSize int32
// initialConnWindowSize is the initial window size for a connection.
initialConnWindowSize int32
// rangefeedInitialWindowSize is the initial window size for a RangeFeed RPC.
rangefeedInitialWindowSize int32
}
}

Expand All @@ -94,8 +92,6 @@ func (s *windowSizeSettings) maybeInit(ctx context.Context) {
if s.values.initialConnWindowSize > maximumWindowSize {
s.values.initialConnWindowSize = maximumWindowSize
}
s.values.rangefeedInitialWindowSize = getWindowSize(ctx,
"COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128KB */)
})
}

Expand All @@ -111,12 +107,6 @@ func (s *windowSizeSettings) initialConnWindowSize(ctx context.Context) int32 {
return s.values.initialConnWindowSize
}

// For a RangeFeed RPC.
func (s *windowSizeSettings) rangefeedInitialWindowSize(ctx context.Context) int32 {
s.maybeInit(ctx)
return s.values.rangefeedInitialWindowSize
}

// sourceAddr is the environment-provided local address for outgoing
// connections.
var sourceAddr = func() net.Addr {
Expand Down

0 comments on commit 05d1395

Please sign in to comment.