diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 61609a1a933a..8a860ae412aa 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -213,6 +213,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/idalloc:idalloc_test", "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", @@ -1252,6 +1253,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", @@ -2670,6 +2673,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel new file mode 100644 index 000000000000..a286451bb7e5 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel @@ -0,0 +1,36 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "kvflowdispatch", + srcs = ["kvflowdispatch.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/roachpb", + "//pkg/util/admission/admissionpb", + "//pkg/util/syncutil", + ], +) + +go_test( + name = "kvflowdispatch_test", + srcs = ["kvflowdispatch_test.go"], + args = ["-test.timeout=295s"], + data = glob(["testdata/**"]), + embed = [":kvflowdispatch"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/roachpb", + "//pkg/testutils/datapathutils", + "//pkg/util/admission/admissionpb", + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go new file mode 100644 index 000000000000..fddb450425cb --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go @@ -0,0 +1,107 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowdispatch + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// Dispatch is a concrete implementation of the kvflowcontrol.Dispatch +// interface. It's used to (i) dispatch information about admitted raft log +// entries to specific nodes, and (ii) to read pending dispatches. +type Dispatch struct { + mu struct { + syncutil.Mutex + // outbox maintains pending dispatches on a per-node basis. + outbox map[roachpb.NodeID]dispatches + } +} + +// dispatchKey is used to coalesce dispatches bound for a given node. If +// transmitting two kvflowcontrolpb.AdmittedRaftLogEntries with the same +// triple, with UpToRaftLogPositions L1 and L2 +// where L1 < L2, we can simply dispatch the one with L2. +type dispatchKey struct { + roachpb.RangeID + roachpb.StoreID + admissionpb.WorkPriority +} + +type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition + +var _ kvflowcontrol.Dispatch = &Dispatch{} + +// New constructs a new Dispatch. +func New() *Dispatch { + d := &Dispatch{} + d.mu.outbox = make(map[roachpb.NodeID]dispatches) + return d +} + +// Dispatch is part of the kvflowcontrol.Dispatch interface. +func (d *Dispatch) Dispatch(nodeID roachpb.NodeID, entries kvflowcontrolpb.AdmittedRaftLogEntries) { + d.mu.Lock() + defer d.mu.Unlock() + + if _, ok := d.mu.outbox[nodeID]; !ok { + d.mu.outbox[nodeID] = dispatches{} + } + + dk := dispatchKey{ + entries.RangeID, + entries.StoreID, + admissionpb.WorkPriority(entries.AdmissionPriority), + } + existing, found := d.mu.outbox[nodeID][dk] + if !found || existing.Less(entries.UpToRaftLogPosition) { + d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition + } +} + +// PendingDispatch is part of the kvflowcontrol.Dispatch interface. +func (d *Dispatch) PendingDispatch() []roachpb.NodeID { + d.mu.Lock() + defer d.mu.Unlock() + + nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox)) + for node := range d.mu.outbox { + nodes = append(nodes, node) + } + return nodes +} + +// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface. +func (d *Dispatch) PendingDispatchFor( + nodeID roachpb.NodeID, +) []kvflowcontrolpb.AdmittedRaftLogEntries { + d.mu.Lock() + defer d.mu.Unlock() + + if _, ok := d.mu.outbox[nodeID]; !ok { + return nil + } + + var entries []kvflowcontrolpb.AdmittedRaftLogEntries + for key, dispatch := range d.mu.outbox[nodeID] { + entries = append(entries, kvflowcontrolpb.AdmittedRaftLogEntries{ + RangeID: key.RangeID, + StoreID: key.StoreID, + AdmissionPriority: int32(key.WorkPriority), + UpToRaftLogPosition: dispatch, + }) + } + delete(d.mu.outbox, nodeID) + return entries +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go new file mode 100644 index 000000000000..5caf3f67343a --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -0,0 +1,169 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowdispatch + +import ( + "fmt" + "sort" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestDispatch(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) + for k, v := range admissionpb.WorkPriorityDict { + reverseWorkPriorityDict[v] = k + } + + datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { + var dispatch *Dispatch + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + dispatch = New() + return "" + + case "dispatch": + require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)") + + for _, line := range strings.Split(d.Input, "\n") { + parts := strings.Fields(line) + require.Len(t, parts, 5, "expected form 'node=n range=r pri= store=s up-to-log-position=/'") + + var ( + entries kvflowcontrolpb.AdmittedRaftLogEntries + nodeID roachpb.NodeID + ) + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + inner := strings.Split(parts[i], "=") + require.Len(t, inner, 2) + arg := strings.TrimSpace(inner[1]) + + switch { + case strings.HasPrefix(parts[i], "node="): + // Parse node=n. + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + nodeID = roachpb.NodeID(ni) + + case strings.HasPrefix(parts[i], "range="): + // Parse range=r. + ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r")) + require.NoError(t, err) + entries.RangeID = roachpb.RangeID(ri) + + case strings.HasPrefix(parts[i], "store="): + // Parse store=s. + si, err := strconv.Atoi(strings.TrimPrefix(arg, "s")) + require.NoError(t, err) + entries.StoreID = roachpb.StoreID(si) + + case strings.HasPrefix(parts[i], "pri="): + // Parse pri=. + pri, found := reverseWorkPriorityDict[arg] + require.True(t, found) + entries.AdmissionPriority = int32(pri) + + case strings.HasPrefix(parts[i], "up-to-log-position="): + // Parse up-to-log-position=/. + entries.UpToRaftLogPosition = parseLogPosition(t, arg) + + default: + t.Fatalf("unrecognized prefix: %s", parts[i]) + } + } + dispatch.Dispatch(nodeID, entries) + } + return "" + + case "pending-dispatch": + require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)") + var buf strings.Builder + nodes := dispatch.PendingDispatch() + sort.Slice(nodes, func(i, j int) bool { // for determinism + return nodes[i] < nodes[j] + }) + for i, node := range nodes { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString(fmt.Sprintf("node=n%d", node)) + } + return buf.String() + + case "pending-dispatch-for": + require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)") + var arg string + d.ScanArgs(t, "node", &arg) + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + var buf strings.Builder + es := dispatch.PendingDispatchFor(roachpb.NodeID(ni)) + sort.Slice(es, func(i, j int) bool { // for determinism + if es[i].RangeID != es[j].RangeID { + return es[i].RangeID < es[j].RangeID + } + if es[i].StoreID != es[j].StoreID { + return es[i].StoreID < es[j].StoreID + } + if es[i].AdmissionPriority != es[j].AdmissionPriority { + return es[i].AdmissionPriority < es[j].AdmissionPriority + } + return es[i].UpToRaftLogPosition.Less(es[j].UpToRaftLogPosition) + }) + for i, entries := range es { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString( + fmt.Sprintf("range=r%d pri=%s store=s%d up-to-log-position=%s", + entries.RangeID, + admissionpb.WorkPriority(entries.AdmissionPriority), + entries.StoreID, + entries.UpToRaftLogPosition, + ), + ) + } + return buf.String() + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) + }) +} + +func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPosition { + inner := strings.Split(input, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + return kvflowcontrolpb.RaftLogPosition{ + Term: uint64(term), + Index: uint64(index), + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering new file mode 100644 index 000000000000..947588448e74 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/log_position_ordering @@ -0,0 +1,25 @@ +# Verify that dispatches get coalesced correctly. All things equal, if +# dispatching with a higher up-to-log-position, we'll ignore the lower entries. + +init +---- + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=5/20 +---- + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=5/20 + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/20 +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/19 +---- + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20 + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores new file mode 100644 index 000000000000..629b4be08ca3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/multiple_nodes_priorities_stores @@ -0,0 +1,40 @@ +# Verify that we can maintain dispatches for: +# - multiple nodes (n1 and n2 below, reading from one doesn't affect the +# other); +# - multiple stores (s2 and s3 from n2 below, where dispatches are not +# coalesced); +# - multiple priorities (high-pri and normal-pri for below, where +# dispatches are not coalesced); +init +---- + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 +node=n2 range=r2 pri=normal-pri store=s2 up-to-log-position=5/20 +node=n2 range=r3 pri=normal-pri store=s3 up-to-log-position=5/21 +node=n2 range=r3 pri=high-pri store=s3 up-to-log-position=5/22 +---- + +pending-dispatch +---- +node=n1 +node=n2 + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=4/20 + +pending-dispatch +---- +node=n2 + +pending-dispatch-for node=n2 +---- +range=r2 pri=normal-pri store=s2 up-to-log-position=log-position=5/20 +range=r3 pri=normal-pri store=s3 up-to-log-position=log-position=5/21 +range=r3 pri=high-pri store=s3 up-to-log-position=log-position=5/22 + +pending-dispatch +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch new file mode 100644 index 000000000000..c74a021ecce3 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/testdata/single_dispatch @@ -0,0 +1,25 @@ +# Verify that we can issue a single dispatch, and that it gets removed +# appropriately. + +init +---- + +dispatch +node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20 +---- + +pending-dispatch +---- +node=n1 + +pending-dispatch-for node=n1 +---- +range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=4/20 + +pending-dispatch +---- + +pending-dispatch-for node=n1 +---- + +# vim:ft=sh