Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver,kvflowcontrol: integrate flow control #98308

Merged
merged 12 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@

/pkg/geo/ @cockroachdb/spatial

/pkg/inspectz/ @cockroachdb/admission-control

# The KV team generally owns ./pkg/kv/... but not all of it. By convention,
# inside of the /pkg/kv tree, we list out rules for each subdirectory, i.e. when
# a new directory is created CODEOWNERS should mandate a new line below. This
Expand Down
4 changes: 2 additions & 2 deletions .github/bors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ block_labels = ["do-not-merge"]
# Number of seconds from when a merge commit is created to when its statuses
# must pass.
#
# Set to 40 minutes
timeout_sec = 2400
# Set to 1 hour 20 minutes.
timeout_sec = 4800
required_approvals = 1

[committer]
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto
trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured tenant-rw
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez tenant-rw
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. tenant-rw
version version 1000023.1-8 set the active cluster version in the format '<major>.<minor>' tenant-rw
version version 1000023.1-10 set the active cluster version in the format '<major>.<minor>' tenant-rw
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,6 @@
<tr><td><div id="setting-trace-snapshot-rate" class="anchored"><code>trace.snapshot.rate</code></div></td><td>duration</td><td><code>0s</code></td><td>if non-zero, interval at which background trace snapshots are captured</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-8</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-10</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,8 @@ GO_TARGETS = [
"//pkg/gossip/simulation:simulation",
"//pkg/gossip:gossip",
"//pkg/gossip:gossip_test",
"//pkg/inspectz/inspectzpb:inspectzpb",
"//pkg/inspectz:inspectz",
"//pkg/internal/client/requestbatcher:requestbatcher",
"//pkg/internal/client/requestbatcher:requestbatcher_test",
"//pkg/internal/codeowners:codeowners",
Expand Down Expand Up @@ -1327,6 +1329,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
Expand Down Expand Up @@ -2722,6 +2725,8 @@ GET_X_DATA_TARGETS = [
"//pkg/geo/twkb:get_x_data",
"//pkg/gossip:get_x_data",
"//pkg/gossip/simulation:get_x_data",
"//pkg/inspectz:get_x_data",
"//pkg/inspectz/inspectzpb:get_x_data",
"//pkg/internal/client/requestbatcher:get_x_data",
"//pkg/internal/codeowners:get_x_data",
"//pkg/internal/reporoot:get_x_data",
Expand Down Expand Up @@ -2812,6 +2817,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type TestingKnobs struct {
DialerKnobs ModuleTestingKnobs
ProtectedTS ModuleTestingKnobs
CapturedIndexUsageStatsKnobs ModuleTestingKnobs
AdmissionControlOptions ModuleTestingKnobs // TODO(irfansharif): Remove.
AdmissionControl ModuleTestingKnobs
RaftTransport ModuleTestingKnobs
UnusedIndexRecommendKnobs ModuleTestingKnobs
ExternalConnection ModuleTestingKnobs
EventExporter ModuleTestingKnobs
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -153,6 +154,8 @@ func TestIngest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 104639, "flaky test")

ctx := context.Background()
t.Run("batch=default", func(t *testing.T) {
runTestIngest(t, func(_ *cluster.Settings) {})
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ crdb_internal kv_catalog_descriptor table admin NULL NULL
crdb_internal kv_catalog_namespace table admin NULL NULL
crdb_internal kv_catalog_zones table admin NULL NULL
crdb_internal kv_dropped_relations view admin NULL NULL
crdb_internal kv_flow_control_handles table admin NULL NULL
crdb_internal kv_flow_controller table admin NULL NULL
crdb_internal kv_flow_token_deductions table admin NULL NULL
crdb_internal kv_inherited_role_members table admin NULL NULL
crdb_internal kv_node_liveness table admin NULL NULL
crdb_internal kv_node_status table admin NULL NULL
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ table_name NOT IN (
'kv_catalog_zones',
'kv_dropped_relations',
'kv_inherited_role_members',
'kv_flow_control_handles',
'kv_flow_controller',
'kv_flow_token_deductions',
'lost_descriptors_with_data',
'table_columns',
'table_row_statistics',
Expand Down
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ const (
// the system tenant.
V23_2_EnableRangeCoalescingForSystemTenant

// V23_2_UseACRaftEntryEntryEncodings gates the use of raft entry encodings
// that (optionally) embed below-raft admission data.
V23_2_UseACRaftEntryEntryEncodings

// *************************************************
// Step (1) Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -935,6 +939,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_2_EnableRangeCoalescingForSystemTenant,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 8},
},
{
Key: V23_2_UseACRaftEntryEntryEncodings,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 10},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
2 changes: 2 additions & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PROTOBUF_SRCS = [
"//pkg/geo/geoindex:geoindex_go_proto",
"//pkg/geo/geopb:geopb_go_proto",
"//pkg/gossip:gossip_go_proto",
"//pkg/inspectz/inspectzpb:inspectzpb_go_proto",
"//pkg/jobs/jobspb:jobspb_go_proto",
"//pkg/keyvisualizer/keyvispb:keyvispb_go_proto",
"//pkg/kv/bulk/bulkpb:bulkpb_go_proto",
Expand All @@ -34,6 +35,7 @@ PROTOBUF_SRCS = [
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
"//pkg/kv/kvserver/concurrency/poison:poison_go_proto",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb_go_proto",
"//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto",
Expand Down
22 changes: 22 additions & 0 deletions pkg/inspectz/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "inspectz",
srcs = [
"inspectz.go",
"unsupported.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/inspectz",
visibility = ["//visibility:public"],
deps = [
"//pkg/inspectz/inspectzpb",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/roachpb",
"//pkg/util/errorutil",
"//pkg/util/log",
],
)

get_x_data(name = "get_x_data")
151 changes: 151 additions & 0 deletions pkg/inspectz/inspectz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package inspectz

import (
"context"
"encoding/json"
"net/http"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// URLPrefix is the prefix for all inspectz endpoints hosted by the server.
const URLPrefix = "/inspectz/"

// Server is a concrete implementation of the InspectzServer interface,
// organizing everything under /inspectz/*. It's the top-level component that
// houses parsing logic for common inspectz URL parameters and maintains routing
// logic.
type Server struct {
log.AmbientContext

mux *http.ServeMux
handles kvflowcontrol.Handles
kvflowController kvflowcontrol.Controller
}

var _ inspectzpb.InspectzServer = &Server{}

// NewServer sets up an inspectz server.
func NewServer(
ambient log.AmbientContext,
handles kvflowcontrol.Handles,
kvflowController kvflowcontrol.Controller,
) *Server {
mux := http.NewServeMux()
server := &Server{
AmbientContext: ambient,

mux: mux,
handles: handles,
kvflowController: kvflowController,
}
mux.Handle("/inspectz/kvflowhandles", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.HandlesRequest{}
if rangeIDs, ok := parseRangeIDs(r.URL.Query().Get("ranges"), w); ok {
req.RangeIDs = rangeIDs
}
resp, err := server.KVFlowHandles(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))
mux.Handle("/inspectz/kvflowcontroller", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.ControllerRequest{}
resp, err := server.KVFlowController(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))

return server
}

// KVFlowController implements the InspectzServer interface.
func (s *Server) KVFlowController(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return &kvflowinspectpb.ControllerResponse{
Streams: s.kvflowController.Inspect(ctx),
}, nil
}

// KVFlowHandles implements the InspectzServer interface.
func (s *Server) KVFlowHandles(
ctx context.Context, request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error) {
resp := &kvflowinspectpb.HandlesResponse{}
if len(request.RangeIDs) == 0 {
request.RangeIDs = s.handles.Inspect()
}
for _, rangeID := range request.RangeIDs {
handle, found := s.handles.Lookup(rangeID)
if !found {
continue // nothing to do
}
resp.Handles = append(resp.Handles, handle.Inspect(ctx))
}
return resp, nil
}

// ServeHTTP serves various tools under the /debug endpoint.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
}

func respond(ctx context.Context, w http.ResponseWriter, code int, payload interface{}) {
res, err := json.Marshal(payload)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_, _ = w.Write(res)
}

func parseRangeIDs(input string, w http.ResponseWriter) (ranges []roachpb.RangeID, ok bool) {
if len(input) == 0 {
return nil, true
}
for _, part := range strings.Split(input, ",") {
rangeID, err := strconv.ParseInt(part, 10, 64)
if err != nil {
http.Error(w, "invalid range id", http.StatusBadRequest)
return nil, false
}

ranges = append(ranges, roachpb.RangeID(rangeID))
}
return ranges, true
}
36 changes: 36 additions & 0 deletions pkg/inspectz/inspectzpb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
name = "inspectzpb_proto",
srcs = ["inspectz.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb_proto",
"@go_googleapis//google/api:annotations_proto",
],
)

go_proto_library(
name = "inspectzpb_go_proto",
compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_grpc_compiler"],
importpath = "github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb",
proto = ":inspectzpb_proto",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"@org_golang_google_genproto//googleapis/api/annotations:go_default_library",
],
)

go_library(
name = "inspectzpb",
embed = [":inspectzpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb",
visibility = ["//visibility:public"],
)

get_x_data(name = "get_x_data")
Loading