Skip to content

Commit

Permalink
server,roachpb: introduce basic span config rpcs
Browse files Browse the repository at this point in the history
Part of cockroachdb#67679; these RPCs are exposed through the `kvtenant.Connector`
interface for tenants and also sit on `pkg/server.(*Node)` for the host
tenant. The basic type in these RPCs is the `SpanConfig`, which is the
same as our existing `ZoneConfig` proto type but without any inheritance
business.

The RPCs are backed by the `system.span_configurations` system table
added in an earlier commit. Future PRs will wire a view of this table
into KV with an eye towards replacing our use of `config.SystemConfig`.

---

While here, we al introduce a `crdb_internal.pretty_span` builtin to
help with the readability of this table. In future PRs we'll make use of
this built-in for datadriven tests asserting on the state of the table.

Release note: None
  • Loading branch information
irfansharif committed Aug 18, 2021
1 parent 2c26b7b commit 584f602
Show file tree
Hide file tree
Showing 37 changed files with 4,339 additions and 599 deletions.
2 changes: 1 addition & 1 deletion pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ ALL_TESTS = [
"//pkg/migration/migrationcluster:migrationcluster_test",
"//pkg/migration/migrationmanager:migrationmanager_test",
"//pkg/migration/migrations:migrations_test",
"//pkg/roachpb:external_test",
"//pkg/roachpb:roachpb_test",
"//pkg/roachpb:string_test",
"//pkg/rpc/nodedialer:nodedialer_test",
"//pkg/rpc:rpc_test",
"//pkg/security/certmgr:certmgr_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsuppor
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/rpc",
"//pkg/server/serverpb",
"//pkg/spanconfig",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/log",
Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -108,6 +109,9 @@ var _ config.SystemConfigProvider = (*Connector)(nil)
// multi-region primitives.
var _ serverpb.RegionsServer = (*Connector)(nil)

// Connector is capable of accessing span configurations for secondary tenants.
var _ spanconfig.KVAccessor = (*Connector)(nil)

// NewConnector creates a new Connector.
// NOTE: Calling Start will set cfg.RPCContext.ClusterID.
func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
Expand Down Expand Up @@ -502,3 +506,42 @@ func (c *Connector) tryForgetClient(ctx context.Context, client roachpb.Internal
c.mu.client = nil
}
}

// GetSpanConfigEntriesFor implements the spanconfig.KVAccessor interface.
func (c *Connector) GetSpanConfigEntriesFor(
ctx context.Context, spans []roachpb.Span,
) ([][]roachpb.SpanConfigEntry, error) {
for ctx.Err() == nil {
client, err := c.getClient(ctx)
if err != nil {
continue
}
resp, err := client.GetSpanConfigs(ctx, &roachpb.GetSpanConfigsRequest{
Spans: spans,
})
if err != nil {
return nil, err
}
return resp.Unnest(), nil
}
return nil, ctx.Err()
}

// UpdateSpanConfigEntries implements the spanconfig.KVAccessor
// interface.
func (c *Connector) UpdateSpanConfigEntries(
ctx context.Context, update []roachpb.SpanConfigEntry, delete []roachpb.Span,
) error {
for ctx.Err() == nil {
client, err := c.getClient(ctx)
if err != nil {
continue
}
_, err = client.UpdateSpanConfigs(ctx, &roachpb.UpdateSpanConfigsRequest{
SpanConfigsToUpdate: update,
SpansToDelete: delete,
})
return err
}
return ctx.Err()
}
12 changes: 12 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ func (*mockServer) TokenBucket(
panic("unimplemented")
}

func (m *mockServer) GetSpanConfigs(
context.Context, *roachpb.GetSpanConfigsRequest,
) (*roachpb.GetSpanConfigsResponse, error) {
panic("unimplemented")
}

func (m *mockServer) UpdateSpanConfigs(
context.Context, *roachpb.UpdateSpanConfigsRequest,
) (*roachpb.UpdateSpanConfigsResponse, error) {
panic("unimplemented")
}

func gossipEventForClusterID(clusterID uuid.UUID) *roachpb.GossipSubscriptionEvent {
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeyClusterID,
Expand Down
1 change: 1 addition & 0 deletions pkg/config/zonepb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/sql/opt/cat",
"//pkg/sql/sem/tree",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
"@in_gopkg_yaml_v2//:yaml_v2",
Expand Down
104 changes: 104 additions & 0 deletions pkg/config/zonepb/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package zonepb

import (
"bytes"
"context"
"fmt"
"strings"
"time"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
)
Expand Down Expand Up @@ -1152,3 +1154,105 @@ func (c *Constraint) GetValue() string {
func (m *GCPolicy) TTL() time.Duration {
return time.Duration(m.TTLSeconds) * time.Second
}

// EnsureFullyHydrated returns an assertion error if the zone config is not
// fully hydrated. A fully hydrated zone configuration must have all required
// fields set, which are RangeMaxBytes, RangeMinBytes, GC, and NumReplicas.
func (z *ZoneConfig) EnsureFullyHydrated() error {
const unhydratedZoneConfigMessage = "expected hydrated zone config: %s unset"
if z.RangeMaxBytes == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "RangeMaxBytes")
}
if z.RangeMinBytes == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "RangeMinBytes")
}
if z.GC == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "GCPolicy")
}
if z.NumReplicas == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "NumReplicas")
}
return nil
}

// AsSpanConfig converts a fully hydrated zone configuration to an equivalent
// SpanConfig. It fatals if the zone config hasn't been fully hydrated (fields
// are expected to have been cascaded through parent zone configs).
func (z *ZoneConfig) AsSpanConfig() roachpb.SpanConfig {
spanConfig, err := z.toSpanConfig()
if err != nil {
log.Fatalf(context.Background(), "%v", err)
}
return spanConfig
}

func (z *ZoneConfig) toSpanConfig() (roachpb.SpanConfig, error) {
var sc roachpb.SpanConfig
var err error

if err = z.EnsureFullyHydrated(); err != nil {
return sc, err
}

// Copy over the values.
sc.RangeMinBytes = *z.RangeMinBytes
sc.RangeMaxBytes = *z.RangeMaxBytes
sc.GCTTL = z.GC.TTLSeconds

// GlobalReads is false by default.
if z.GlobalReads != nil {
sc.GlobalReads = *z.GlobalReads
}
sc.NumReplicas = *z.NumReplicas
if z.NumVoters != nil {
sc.NumVoters = *z.NumVoters
}

toSpanConfigConstraints := func(src []Constraint) ([]roachpb.Constraint, error) {
spanConfigConstraints := make([]roachpb.Constraint, len(src))
for i, c := range src {
switch c.Type {
case Constraint_REQUIRED:
spanConfigConstraints[i].Type = roachpb.Constraint_REQUIRED
case Constraint_PROHIBITED:
spanConfigConstraints[i].Type = roachpb.Constraint_PROHIBITED
default:
return nil, errors.AssertionFailedf("unknown constraint type: %v", c.Type)
}
spanConfigConstraints[i].Key = c.Key
spanConfigConstraints[i].Value = c.Value
}
return spanConfigConstraints, nil
}

toSpanConfigConstraintsConjunction := func(src []ConstraintsConjunction) ([]roachpb.ConstraintsConjunction, error) {
constraintsConjunction := make([]roachpb.ConstraintsConjunction, len(src))
for i, constraint := range src {
constraintsConjunction[i].NumReplicas = constraint.NumReplicas
constraintsConjunction[i].Constraints, err = toSpanConfigConstraints(constraint.Constraints)
if err != nil {
return nil, err
}
}
return constraintsConjunction, nil
}

sc.Constraints = make([]roachpb.ConstraintsConjunction, len(z.Constraints))
sc.Constraints, err = toSpanConfigConstraintsConjunction(z.Constraints)
if err != nil {
return roachpb.SpanConfig{}, err
}
sc.VoterConstraints, err = toSpanConfigConstraintsConjunction(z.VoterConstraints)
if err != nil {
return roachpb.SpanConfig{}, err
}

sc.LeasePreferences = make([]roachpb.LeasePreference, len(z.LeasePreferences))
for i, leasePreference := range z.LeasePreferences {
sc.LeasePreferences[i].Constraints, err = toSpanConfigConstraints(leasePreference.Constraints)
if err != nil {
return roachpb.SpanConfig{}, err
}
}
return sc, nil
}
12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ func (n Node) TokenBucket(
panic("unimplemented")
}

func (n Node) GetSpanConfigs(
_ context.Context, _ *roachpb.GetSpanConfigsRequest,
) (*roachpb.GetSpanConfigsResponse, error) {
panic("unimplemented")
}

func (n Node) UpdateSpanConfigs(
_ context.Context, _ *roachpb.UpdateSpanConfigsRequest,
) (*roachpb.UpdateSpanConfigsResponse, error) {
panic("unimplemented")
}

// TestSendToOneClient verifies that Send correctly sends a request
// to one server using the heartbeat RPC.
func TestSendToOneClient(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,15 @@ func (m *mockInternalClient) TokenBucket(
) (*roachpb.TokenBucketResponse, error) {
return nil, fmt.Errorf("unsupported TokenBucket call")
}

func (m *mockInternalClient) GetSpanConfigs(
_ context.Context, _ *roachpb.GetSpanConfigsRequest, _ ...grpc.CallOption,
) (*roachpb.GetSpanConfigsResponse, error) {
return nil, fmt.Errorf("unsupported GetSpanConfigs call")
}

func (m *mockInternalClient) UpdateSpanConfigs(
_ context.Context, _ *roachpb.UpdateSpanConfigsRequest, _ ...grpc.CallOption,
) (*roachpb.UpdateSpanConfigsResponse, error) {
return nil, fmt.Errorf("unsupported UpdateSpanConfigs call")
}
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvtenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/server/serverpb",
"//pkg/spanconfig",
"//pkg/util/log",
"//pkg/util/retry",
"@com_github_cockroachdb_errors//:errors",
Expand Down
26 changes: 16 additions & 10 deletions pkg/kv/kvclient/kvtenant/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
Expand All @@ -41,32 +42,37 @@ type Connector interface {
// Start starts the connector.
Start(context.Context) error

// Connector is capable of providing information on each of the KV nodes in
// the cluster in the form of NodeDescriptors. This obviates the need for
// SQL-only tenant processes to join the cluster-wide gossip network.
// NodeDescStore provides information on each of the KV nodes in the cluster
// in the form of NodeDescriptors. This obviates the need for SQL-only
// tenant processes to join the cluster-wide gossip network.
kvcoord.NodeDescStore

// Connector is capable of providing Range addressing information in the
// form of RangeDescriptors through delegated RangeLookup requests. This is
// RangeDescriptorDB provides range addressing information in the form of
// RangeDescriptors through delegated RangeLookup requests. This is
// necessary because SQL-only tenants are restricted from reading Range
// Metadata keys directly. Instead, the RangeLookup requests are proxied
// through existing KV nodes while being subject to additional validation
// (e.g. is the Range being requested owned by the requesting tenant?).
rangecache.RangeDescriptorDB

// Connector is capable of providing a filtered view of the SystemConfig
// SystemConfigProvider provides a filtered view of the SystemConfig
// containing only information applicable to secondary tenants. This
// obviates the need for SQL-only tenant processes to join the cluster-wide
// gossip network.
config.SystemConfigProvider

// Connector is capable of knowing every region in the cluster.
// This is necessary for region validation for zone configurations and
// multi-region primitives.
// RegionsServer provides access to a tenant's available regions. This is
// necessary for region validation for zone configurations and multi-region
// primitives.
serverpb.RegionsServer

// Connector is capable of providing an endpoint for the TokenBucket API.
// TokenBucketProvider provides access to the tenant cost control token
// bucket.
TokenBucketProvider

// KVAccessor provides access to the subset of the cluster's span configs
// applicable to secondary tenants.
spanconfig.KVAccessor
}

// TokenBucketProvider supplies an endpoint (to tenants) for the TokenBucket API
Expand Down
9 changes: 7 additions & 2 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# gazelle:exclude string_test.go
# gazelle:exclude client_test.go

load("@bazel_gomock//:gomock.bzl", "gomock")
load("@rules_proto//proto:defs.bzl", "proto_library")
Expand Down Expand Up @@ -184,15 +185,19 @@ go_test(

# keep
go_test(
name = "string_test",
name = "external_test",
size = "small",
srcs = ["string_test.go"],
srcs = [
"client_test.go",
"string_test.go",
],
deps = [
":with-mocks",
"//pkg/cli/exit",
"//pkg/keys",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/util/hlc",
"//pkg/util/uuid",
"@com_github_cockroachdb_redact//:redact",
Expand Down
Loading

0 comments on commit 584f602

Please sign in to comment.