Skip to content

Commit

Permalink
server,roachpb: introduce basic span config rpcs
Browse files Browse the repository at this point in the history
Part of cockroachdb#67679; these RPCs are exposed through the `kvtenant.Connector`
interface for tenants and also sit on `pkg/server.(*Node)` for the host
tenant. The basic type in these RPCs is the `SpanConfig`, which is the
same as our existing `ZoneConfig` proto type but without any inheritance
business.

The RPCs are backed by the `system.span_configurations` system table
added in an earlier commit. Future PRs will wire a view of this table
into KV with an eye towards replacing our use of `config.SystemConfig`.

---

While here, we al introduce a `crdb_internal.pretty_span` builtin to
help with the readability of this table. In future PRs we'll make use of
this built-in for datadriven tests asserting on the state of the table.

Release note: None
  • Loading branch information
irfansharif committed Aug 18, 2021
1 parent 725cee1 commit 5723cf4
Show file tree
Hide file tree
Showing 39 changed files with 4,641 additions and 605 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2856,6 +2856,8 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="crdb_internal.pretty_key"></a><code>crdb_internal.pretty_key(raw_key: <a href="bytes.html">bytes</a>, skip_fields: <a href="int.html">int</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.pretty_span"></a><code>crdb_internal.pretty_span(raw_key_start: <a href="bytes.html">bytes</a>, raw_key_end: <a href="bytes.html">bytes</a>, skip_fields: <a href="int.html">int</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.range_stats"></a><code>crdb_internal.range_stats(key: <a href="bytes.html">bytes</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>This function is used to retrieve range statistics information as a JSON object.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.reset_sql_stats"></a><code>crdb_internal.reset_sql_stats() &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>This function is used to clear the collected SQL statistics.</p>
Expand Down
3 changes: 2 additions & 1 deletion pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ ALL_TESTS = [
"//pkg/migration/migrationcluster:migrationcluster_test",
"//pkg/migration/migrationmanager:migrationmanager_test",
"//pkg/migration/migrations:migrations_test",
"//pkg/roachpb:external_test",
"//pkg/roachpb:roachpb_test",
"//pkg/roachpb:string_test",
"//pkg/rpc/nodedialer:nodedialer_test",
"//pkg/rpc:rpc_test",
"//pkg/security/certmgr:certmgr_test",
Expand All @@ -183,6 +183,7 @@ ALL_TESTS = [
"//pkg/server:server_test",
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig:spanconfig_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catalogkv:catalogkv_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsuppor
func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) }
func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/rpc",
"//pkg/server/serverpb",
"//pkg/spanconfig",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/log",
Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -108,6 +109,9 @@ var _ config.SystemConfigProvider = (*Connector)(nil)
// multi-region primitives.
var _ serverpb.RegionsServer = (*Connector)(nil)

// Connector is capable of accessing span configurations for secondary tenants.
var _ spanconfig.KVAccessor = (*Connector)(nil)

// NewConnector creates a new Connector.
// NOTE: Calling Start will set cfg.RPCContext.ClusterID.
func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
Expand Down Expand Up @@ -502,3 +506,42 @@ func (c *Connector) tryForgetClient(ctx context.Context, client roachpb.Internal
c.mu.client = nil
}
}

// GetSpanConfigEntriesFor implements the spanconfig.KVAccessor interface.
func (c *Connector) GetSpanConfigEntriesFor(
ctx context.Context, spans []roachpb.Span,
) ([][]roachpb.SpanConfigEntry, error) {
for ctx.Err() == nil {
client, err := c.getClient(ctx)
if err != nil {
continue
}
resp, err := client.GetSpanConfigs(ctx, &roachpb.GetSpanConfigsRequest{
Spans: spans,
})
if err != nil {
return nil, err
}
return resp.Unnest(), nil
}
return nil, ctx.Err()
}

// UpdateSpanConfigEntries implements the spanconfig.KVAccessor
// interface.
func (c *Connector) UpdateSpanConfigEntries(
ctx context.Context, update []roachpb.SpanConfigEntry, delete []roachpb.Span,
) error {
for ctx.Err() == nil {
client, err := c.getClient(ctx)
if err != nil {
continue
}
_, err = client.UpdateSpanConfigs(ctx, &roachpb.UpdateSpanConfigsRequest{
SpanConfigsToUpdate: update,
SpansToDelete: delete,
})
return err
}
return ctx.Err()
}
12 changes: 12 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ func (*mockServer) TokenBucket(
panic("unimplemented")
}

func (m *mockServer) GetSpanConfigs(
context.Context, *roachpb.GetSpanConfigsRequest,
) (*roachpb.GetSpanConfigsResponse, error) {
panic("unimplemented")
}

func (m *mockServer) UpdateSpanConfigs(
context.Context, *roachpb.UpdateSpanConfigsRequest,
) (*roachpb.UpdateSpanConfigsResponse, error) {
panic("unimplemented")
}

func gossipEventForClusterID(clusterID uuid.UUID) *roachpb.GossipSubscriptionEvent {
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeyClusterID,
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/zonepb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/sql/opt/cat",
"//pkg/sql/sem/tree",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
"@in_gopkg_yaml_v2//:yaml_v2",
Expand All @@ -30,6 +31,7 @@ go_test(
embed = [":zonepb"],
deps = [
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/util/leaktest",
Expand Down
104 changes: 104 additions & 0 deletions pkg/config/zonepb/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package zonepb

import (
"bytes"
"context"
"fmt"
"strings"
"time"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
)
Expand Down Expand Up @@ -1152,3 +1154,105 @@ func (c *Constraint) GetValue() string {
func (m *GCPolicy) TTL() time.Duration {
return time.Duration(m.TTLSeconds) * time.Second
}

// EnsureFullyHydrated returns an assertion error if the zone config is not
// fully hydrated. A fully hydrated zone configuration must have all required
// fields set, which are RangeMaxBytes, RangeMinBytes, GC, and NumReplicas.
func (z *ZoneConfig) EnsureFullyHydrated() error {
const unhydratedZoneConfigMessage = "expected hydrated zone config: %s unset"
if z.RangeMaxBytes == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "RangeMaxBytes")
}
if z.RangeMinBytes == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "RangeMinBytes")
}
if z.GC == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "GCPolicy")
}
if z.NumReplicas == nil {
return errors.AssertionFailedf(unhydratedZoneConfigMessage, "NumReplicas")
}
return nil
}

// AsSpanConfig converts a fully hydrated zone configuration to an equivalent
// SpanConfig. It fatals if the zone config hasn't been fully hydrated (fields
// are expected to have been cascaded through parent zone configs).
func (z *ZoneConfig) AsSpanConfig() roachpb.SpanConfig {
spanConfig, err := z.toSpanConfig()
if err != nil {
log.Fatalf(context.Background(), "%v", err)
}
return spanConfig
}

func (z *ZoneConfig) toSpanConfig() (roachpb.SpanConfig, error) {
var sc roachpb.SpanConfig
var err error

if err = z.EnsureFullyHydrated(); err != nil {
return sc, err
}

// Copy over the values.
sc.RangeMinBytes = *z.RangeMinBytes
sc.RangeMaxBytes = *z.RangeMaxBytes
sc.GCTTL = z.GC.TTLSeconds

// GlobalReads is false by default.
if z.GlobalReads != nil {
sc.GlobalReads = *z.GlobalReads
}
sc.NumReplicas = *z.NumReplicas
if z.NumVoters != nil {
sc.NumVoters = *z.NumVoters
}

toSpanConfigConstraints := func(src []Constraint) ([]roachpb.Constraint, error) {
spanConfigConstraints := make([]roachpb.Constraint, len(src))
for i, c := range src {
switch c.Type {
case Constraint_REQUIRED:
spanConfigConstraints[i].Type = roachpb.Constraint_REQUIRED
case Constraint_PROHIBITED:
spanConfigConstraints[i].Type = roachpb.Constraint_PROHIBITED
default:
return nil, errors.AssertionFailedf("unknown constraint type: %v", c.Type)
}
spanConfigConstraints[i].Key = c.Key
spanConfigConstraints[i].Value = c.Value
}
return spanConfigConstraints, nil
}

toSpanConfigConstraintsConjunction := func(src []ConstraintsConjunction) ([]roachpb.ConstraintsConjunction, error) {
constraintsConjunction := make([]roachpb.ConstraintsConjunction, len(src))
for i, constraint := range src {
constraintsConjunction[i].NumReplicas = constraint.NumReplicas
constraintsConjunction[i].Constraints, err = toSpanConfigConstraints(constraint.Constraints)
if err != nil {
return nil, err
}
}
return constraintsConjunction, nil
}

sc.Constraints = make([]roachpb.ConstraintsConjunction, len(z.Constraints))
sc.Constraints, err = toSpanConfigConstraintsConjunction(z.Constraints)
if err != nil {
return roachpb.SpanConfig{}, err
}
sc.VoterConstraints, err = toSpanConfigConstraintsConjunction(z.VoterConstraints)
if err != nil {
return roachpb.SpanConfig{}, err
}

sc.LeasePreferences = make([]roachpb.LeasePreference, len(z.LeasePreferences))
for i, leasePreference := range z.LeasePreferences {
sc.LeasePreferences[i].Constraints, err = toSpanConfigConstraints(leasePreference.Constraints)
if err != nil {
return roachpb.SpanConfig{}, err
}
}
return sc, nil
}
Loading

0 comments on commit 5723cf4

Please sign in to comment.