Skip to content

Commit

Permalink
kvflowcontrol: implement kvflowcontrol.Dispatch
Browse files Browse the repository at this point in the history
Part of #95563. Dispatch is a concrete implementation of the
kvflowcontrol.Dispatch interface. It's used to dispatch information
about admitted raft log entries to the specific nodes where (i) said
entries originated, (ii) flow tokens were deducted and (iii) are waiting
to be returned. This type is also used to read pending dispatches, which
will be used in the raft transport layer when looking to piggyback
information on traffic already bound to specific nodes. Since timely
dispatching (read: piggybacking) is not guaranteed, we allow querying
for all long-overdue dispatches.

Internally it's able to coalesce dispatches bound for a given node. If
dispatching admission information for two log entries with the same
<RangeID,StoreID,WorkPriority> triple, with log positions L1 and L2
where L1 < L2, we can simply dispatch the one with L2. We leave the
integration of this type with the {Store,}WorkQueue (#97599) + raft
transport to future PRs.

Release note: None
  • Loading branch information
irfansharif committed Mar 7, 2023
1 parent f1a4c63 commit 1b01e63
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
Expand Down Expand Up @@ -1252,6 +1253,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
Expand Down Expand Up @@ -2670,6 +2673,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "kvflowdispatch",
srcs = ["kvflowdispatch.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
"//pkg/util/syncutil",
],
)

go_test(
name = "kvflowdispatch_test",
srcs = ["kvflowdispatch_test.go"],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":kvflowdispatch"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/leaktest",
"//pkg/util/log",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
107 changes: 107 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvflowdispatch

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// Dispatch is a concrete implementation of the kvflowcontrol.Dispatch
// interface. It's used to (i) dispatch information about admitted raft log
// entries to specific nodes, and (ii) to read pending dispatches.
type Dispatch struct {
mu struct {
syncutil.Mutex
// outbox maintains pending dispatches on a per-node basis.
outbox map[roachpb.NodeID]dispatches
}
}

// dispatchKey is used to coalesce dispatches bound for a given node. If
// transmitting two kvflowcontrolpb.AdmittedRaftLogEntries with the same
// <RangeID,StoreID,WorkPriority> triple, with UpToRaftLogPositions L1 and L2
// where L1 < L2, we can simply dispatch the one with L2.
type dispatchKey struct {
roachpb.RangeID
roachpb.StoreID
admissionpb.WorkPriority
}

type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition

var _ kvflowcontrol.Dispatch = &Dispatch{}

// New constructs a new Dispatch.
func New() *Dispatch {
d := &Dispatch{}
d.mu.outbox = make(map[roachpb.NodeID]dispatches)
return d
}

// Dispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) Dispatch(nodeID roachpb.NodeID, entries kvflowcontrolpb.AdmittedRaftLogEntries) {
d.mu.Lock()
defer d.mu.Unlock()

if _, ok := d.mu.outbox[nodeID]; !ok {
d.mu.outbox[nodeID] = dispatches{}
}

dk := dispatchKey{
entries.RangeID,
entries.StoreID,
admissionpb.WorkPriority(entries.AdmissionPriority),
}
existing, found := d.mu.outbox[nodeID][dk]
if !found || existing.Less(entries.UpToRaftLogPosition) {
d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition
}
}

// PendingDispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatch() []roachpb.NodeID {
d.mu.Lock()
defer d.mu.Unlock()

nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox))
for node := range d.mu.outbox {
nodes = append(nodes, node)
}
return nodes
}

// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatchFor(
nodeID roachpb.NodeID,
) []kvflowcontrolpb.AdmittedRaftLogEntries {
d.mu.Lock()
defer d.mu.Unlock()

if _, ok := d.mu.outbox[nodeID]; !ok {
return nil
}

var entries []kvflowcontrolpb.AdmittedRaftLogEntries
for key, dispatch := range d.mu.outbox[nodeID] {
entries = append(entries, kvflowcontrolpb.AdmittedRaftLogEntries{
RangeID: key.RangeID,
StoreID: key.StoreID,
AdmissionPriority: int32(key.WorkPriority),
UpToRaftLogPosition: dispatch,
})
}
delete(d.mu.outbox, nodeID)
return entries
}
169 changes: 169 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvflowdispatch

import (
"fmt"
"sort"
"strconv"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

func TestDispatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority)
for k, v := range admissionpb.WorkPriorityDict {
reverseWorkPriorityDict[v] = k
}

datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
var dispatch *Dispatch
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "init":
dispatch = New()
return ""

case "dispatch":
require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)")

for _, line := range strings.Split(d.Input, "\n") {
parts := strings.Fields(line)
require.Len(t, parts, 5, "expected form 'node=n<int> range=r<int> pri=<string> store=s<int> up-to-log-position=<int>/<int>'")

var (
entries kvflowcontrolpb.AdmittedRaftLogEntries
nodeID roachpb.NodeID
)
for i := range parts {
parts[i] = strings.TrimSpace(parts[i])
inner := strings.Split(parts[i], "=")
require.Len(t, inner, 2)
arg := strings.TrimSpace(inner[1])

switch {
case strings.HasPrefix(parts[i], "node="):
// Parse node=n<int>.
ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n"))
require.NoError(t, err)
nodeID = roachpb.NodeID(ni)

case strings.HasPrefix(parts[i], "range="):
// Parse range=r<int>.
ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r"))
require.NoError(t, err)
entries.RangeID = roachpb.RangeID(ri)

case strings.HasPrefix(parts[i], "store="):
// Parse store=s<int>.
si, err := strconv.Atoi(strings.TrimPrefix(arg, "s"))
require.NoError(t, err)
entries.StoreID = roachpb.StoreID(si)

case strings.HasPrefix(parts[i], "pri="):
// Parse pri=<string>.
pri, found := reverseWorkPriorityDict[arg]
require.True(t, found)
entries.AdmissionPriority = int32(pri)

case strings.HasPrefix(parts[i], "up-to-log-position="):
// Parse up-to-log-position=<int>/<int>.
entries.UpToRaftLogPosition = parseLogPosition(t, arg)

default:
t.Fatalf("unrecognized prefix: %s", parts[i])
}
}
dispatch.Dispatch(nodeID, entries)
}
return ""

case "pending-dispatch":
require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)")
var buf strings.Builder
nodes := dispatch.PendingDispatch()
sort.Slice(nodes, func(i, j int) bool { // for determinism
return nodes[i] < nodes[j]
})
for i, node := range nodes {
if i != 0 {
buf.WriteString("\n")
}
buf.WriteString(fmt.Sprintf("node=n%d", node))
}
return buf.String()

case "pending-dispatch-for":
require.NotNilf(t, dispatch, "uninitialized dispatch (did you use 'init'?)")
var arg string
d.ScanArgs(t, "node", &arg)
ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n"))
require.NoError(t, err)
var buf strings.Builder
es := dispatch.PendingDispatchFor(roachpb.NodeID(ni))
sort.Slice(es, func(i, j int) bool { // for determinism
if es[i].RangeID != es[j].RangeID {
return es[i].RangeID < es[j].RangeID
}
if es[i].StoreID != es[j].StoreID {
return es[i].StoreID < es[j].StoreID
}
if es[i].AdmissionPriority != es[j].AdmissionPriority {
return es[i].AdmissionPriority < es[j].AdmissionPriority
}
return es[i].UpToRaftLogPosition.Less(es[j].UpToRaftLogPosition)
})
for i, entries := range es {
if i != 0 {
buf.WriteString("\n")
}
buf.WriteString(
fmt.Sprintf("range=r%d pri=%s store=s%d up-to-log-position=%s",
entries.RangeID,
admissionpb.WorkPriority(entries.AdmissionPriority),
entries.StoreID,
entries.UpToRaftLogPosition,
),
)
}
return buf.String()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
})
}

func parseLogPosition(t *testing.T, input string) kvflowcontrolpb.RaftLogPosition {
inner := strings.Split(input, "/")
require.Len(t, inner, 2)
term, err := strconv.Atoi(inner[0])
require.NoError(t, err)
index, err := strconv.Atoi(inner[1])
require.NoError(t, err)
return kvflowcontrolpb.RaftLogPosition{
Term: uint64(term),
Index: uint64(index),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Verify that dispatches get coalesced correctly. All things equal, if
# dispatching with a higher up-to-log-position, we'll ignore the lower entries.

init
----

dispatch
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=4/20
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=5/20
----

pending-dispatch-for node=n1
----
range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=5/20

dispatch
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/20
node=n1 range=r1 pri=normal-pri store=s1 up-to-log-position=6/19
----

pending-dispatch-for node=n1
----
range=r1 pri=normal-pri store=s1 up-to-log-position=log-position=6/20

# vim:ft=sh
Loading

0 comments on commit 1b01e63

Please sign in to comment.