Skip to content

Commit

Permalink
spanconfigkvsubscriber: teach the KVSubscriber about system span configs
Browse files Browse the repository at this point in the history
This patch teaches the KVSubscriber to handle updates to system span
configurations. There's not much here, other than invoking registered
handlers with the correct span when a system span config is updated.
This entails:
- The everything span when the system span config targets the entire
keyspace or targets the host tenant.
- The targeted tenant's span if the system span config targets a
secondary tenant.

Also fix a bug where we weren't correclty initializing handlers.

Release note: None
  • Loading branch information
arulajmani committed Feb 24, 2022
1 parent 0c00289 commit b83e5b3
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 24 deletions.
14 changes: 4 additions & 10 deletions pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
Expand Down Expand Up @@ -46,8 +45,9 @@ import (
// delete [c,e)
// upsert [c,d):C
// upsert [d,e):D
// upsert {entire-keyspace}:X
// delete {source=1,target=20}
// ----
// ok
//
// get
// span [a,b)
Expand Down Expand Up @@ -99,6 +99,7 @@ import (
// Text of the form [a,b) and [a,b):C correspond to spans and span config
// records; see spanconfigtestutils.Parse{Span,Config,SpanConfigRecord} for more
// details.
// TODO(arul): Add ability to express tenant spans to this datadriven test.
func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -241,14 +242,7 @@ func TestDataDriven(t *testing.T) {
if i != 0 && receivedUpdates[i].Equal(receivedUpdates[i-1]) {
continue // de-dup updates
}

var spanStr string
if update.Equal(keys.EverythingSpan) {
spanStr = update.String()
} else {
spanStr = spanconfigtestutils.PrintSpan(update)
}
output.WriteString(fmt.Sprintf("%s\n", spanStr))
output.WriteString(fmt.Sprintf("%s\n", spanconfigtestutils.PrintSpan(update)))
}

return output.String()
Expand Down
23 changes: 17 additions & 6 deletions pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
Expand Down Expand Up @@ -248,15 +249,25 @@ func (s *KVSubscriber) handlePartialUpdate(
handlers := s.mu.handlers
s.mu.Unlock()

for _, h := range handlers {
for i := range handlers {
for _, ev := range events {
// TODO(arul): In the future, once we start reacting to system span
// configurations, we'll want to invoke handlers with the correct span
// here as well.
target := ev.(*bufferEvent).Update.Target
if target.IsSpanTarget() {
h.invoke(ctx, target.GetSpan())
var sp roachpb.Span
switch {
case target.IsSpanTarget():
sp = target.GetSpan()
case target.IsSystemTarget():
var err error
sp, err = target.GetSystemTarget().KeyspaceTargeted()
if err != nil {
// Swallow the error here, even though we never expect this to happen.
log.Warningf(
ctx, "error targeting %s's keyspace %v", target.GetSystemTarget(), err,
)
continue
}
}
handlers[i].invoke(ctx, sp)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/spanconfig/spanconfigkvsubscriber/testdata/basic
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ span [a,f)

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)
[d,f)

Expand All @@ -40,7 +40,6 @@ delete [d,f)

updates
----
/M{in-ax}
[d,f)

store-reader key=d
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ upsert [d,f):D

updates
----
/M{in-ax}
[/Min,/Max)
[a,c)
[d,f)

Expand Down Expand Up @@ -59,7 +59,7 @@ start

updates
----
/M{in-ax}
[/Min,/Max)

store-reader key=a
----
Expand All @@ -75,7 +75,6 @@ upsert [a,c):C

updates
----
/M{in-ax}
[a,c)

store-reader key=a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ start

updates
----
/M{in-ax}
[/Min,/Max)

store-reader key=a
----
Expand Down
127 changes: 127 additions & 0 deletions pkg/spanconfig/spanconfigkvsubscriber/testdata/system_span_configs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@

start
----

update
upsert [a,c):A
upsert [d,f):D
----

updates
----
[/Min,/Max)
[a,c)
[d,f)

store-reader key=a
----
conf=A

store-reader key=e
----
conf=D

store-reader key=g
----
conf=MISSING

update
upsert {entire-keyspace}:+X
----

updates
----
[/Min,/Max)

store-reader key=a
----
conf=A+X

store-reader key=e
----
conf=D+X

# The system span config we set should still apply.
store-reader key=g
----
conf=MISSING+X

# Update span configs that target tenant keyspaces and ensure handlers are
# invoked correctly.
update
upsert {source=1, target=1}:+Y
upsert {source=1,target=20}:+Z
upsert {source=10, target=10}:+W
----

# [/Min,/Tenant/2] corresponds to the system tenant targeting itself.
updates
----
[/Min,/Tenant/2)
[/Tenant/10,/Tenant/11)
[/Tenant/20,/Tenant/21)

# Ensure configs are correctly hydrated when we read them.
store-reader key=a
----
conf=A+X+Y

store-reader key=e
----
conf=D+X+Y

# Delete the system span config over the entire keyspace and ensure handlers are
# invoked correctly + configs for various keys are correctly hydrated.
update
delete {entire-keyspace}
----

updates
----
[/Min,/Max)

store-reader key=a
----
conf=A+Y

store-reader key=e
----
conf=D+Y

# Delete system span configs that target secondary and ensure handlers are
# invoked correctly.
update
delete {source=1,target=20}
----

updates
----
[/Tenant/20,/Tenant/21)

# Update system span config, ensure handlers are correctly invoked, and configs
# are hydrated correctly.
update
upsert {source=1, target=1}:+V
----

updates
----
[/Min,/Tenant/2)

store-reader key=a
----
conf=A+V

store-reader key=e
----
conf=D+V

# Lastly, update a system span config set on a secondary tenant's keyspace and
# ensure handlers are invoked correctly.
update
upsert {source=10, target=10}:+U
----

updates
----
[/Tenant/10,/Tenant/11)
18 changes: 16 additions & 2 deletions pkg/spanconfig/spanconfigtestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var systemTargetRe = regexp.MustCompile(

// configRe matches a single word. It's a shorthand for declaring a unique
// config.
var configRe = regexp.MustCompile(`^(\w+)$`)
var configRe = regexp.MustCompile(`^\+?(\w+)$`)

// ParseSpan is helper function that constructs a roachpb.Span from a string of
// the form "[start, end)".
Expand Down Expand Up @@ -247,7 +247,21 @@ func ParseStoreApplyArguments(t *testing.T, input string) (updates []spanconfig.
// the form "[start,end)". The span is assumed to have been constructed by the
// ParseSpan helper above.
func PrintSpan(sp roachpb.Span) string {
return fmt.Sprintf("[%s,%s)", string(sp.Key), string(sp.EndKey))
s := []string{
sp.Key.String(),
sp.EndKey.String(),
}
for i := range s {
// Raw keys are quoted, so qe unquote them.
if strings.Contains(s[i], "\"") {
var err error
s[i], err = strconv.Unquote(s[i])
if err != nil {
panic(err)
}
}
}
return fmt.Sprintf("[%s,%s)", s[0], s[1])
}

// PrintTarget is a helper function that prints a spanconfig.Target.
Expand Down
30 changes: 30 additions & 0 deletions pkg/spanconfig/systemtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,36 @@ func (st SystemTarget) targetsEntireKeyspace() bool {
return st.systemTargetType == SystemTargetTypeEntireKeyspace
}

// KeyspaceTargeted returns the keyspan the system target applies to.
func (st SystemTarget) KeyspaceTargeted() (roachpb.Span, error) {
switch st.systemTargetType {
case SystemTargetTypeEntireKeyspace:
return keys.EverythingSpan, nil
case SystemTargetTypeSpecificTenantKeyspace:
// If the system tenant's keyspace is being targeted then this means
// everything from the start of the keyspace to where all non-system tenant
// keys begin.
if st.targetTenantID == roachpb.SystemTenantID {
return roachpb.Span{
Key: keys.MinKey,
EndKey: keys.TenantTableDataMin,
}, nil
}
k := keys.MakeTenantPrefix(st.targetTenantID)
return roachpb.Span{
Key: k,
EndKey: k.PrefixEnd(),
}, nil
case SystemTargetTypeAllTenantKeyspaceTargetsSet:
return roachpb.Span{},
errors.AssertionFailedf(
"AllTenantKeyspaceTarget encapsulates other targets; does not target a single keyspace",
)
default:
return roachpb.Span{}, errors.AssertionFailedf("unknown target type")
}
}

// IsReadOnly returns true if the system target is read-only. Read only targets
// should not be persisted.
func (st SystemTarget) IsReadOnly() bool {
Expand Down

0 comments on commit b83e5b3

Please sign in to comment.