Skip to content

Commit

Permalink
Merge #98308
Browse files Browse the repository at this point in the history
98308: kvserver,kvflowcontrol: integrate flow control r=irfansharif a=irfansharif

Part of #95563. See individual commits.

Release note: None

Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
craig[bot] and irfansharif committed Jun 9, 2023
2 parents 33525d5 + 0a6aee0 commit 40c8fe1
Show file tree
Hide file tree
Showing 162 changed files with 10,794 additions and 820 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@

/pkg/geo/ @cockroachdb/spatial

/pkg/inspectz/ @cockroachdb/admission-control

# The KV team generally owns ./pkg/kv/... but not all of it. By convention,
# inside of the /pkg/kv tree, we list out rules for each subdirectory, i.e. when
# a new directory is created CODEOWNERS should mandate a new line below. This
Expand Down
4 changes: 2 additions & 2 deletions .github/bors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ block_labels = ["do-not-merge"]
# Number of seconds from when a merge commit is created to when its statuses
# must pass.
#
# Set to 40 minutes
timeout_sec = 2400
# Set to 1 hour 20 minutes.
timeout_sec = 4800
required_approvals = 1

[committer]
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto
trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured tenant-rw
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez tenant-rw
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. tenant-rw
version version 1000023.1-8 set the active cluster version in the format '<major>.<minor>' tenant-rw
version version 1000023.1-10 set the active cluster version in the format '<major>.<minor>' tenant-rw
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,6 @@
<tr><td><div id="setting-trace-snapshot-rate" class="anchored"><code>trace.snapshot.rate</code></div></td><td>duration</td><td><code>0s</code></td><td>if non-zero, interval at which background trace snapshots are captured</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-8</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-10</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,8 @@ GO_TARGETS = [
"//pkg/gossip/simulation:simulation",
"//pkg/gossip:gossip",
"//pkg/gossip:gossip_test",
"//pkg/inspectz/inspectzpb:inspectzpb",
"//pkg/inspectz:inspectz",
"//pkg/internal/client/requestbatcher:requestbatcher",
"//pkg/internal/client/requestbatcher:requestbatcher_test",
"//pkg/internal/codeowners:codeowners",
Expand Down Expand Up @@ -1327,6 +1329,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
Expand Down Expand Up @@ -2722,6 +2725,8 @@ GET_X_DATA_TARGETS = [
"//pkg/geo/twkb:get_x_data",
"//pkg/gossip:get_x_data",
"//pkg/gossip/simulation:get_x_data",
"//pkg/inspectz:get_x_data",
"//pkg/inspectz/inspectzpb:get_x_data",
"//pkg/internal/client/requestbatcher:get_x_data",
"//pkg/internal/codeowners:get_x_data",
"//pkg/internal/reporoot:get_x_data",
Expand Down Expand Up @@ -2812,6 +2817,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ type TestingKnobs struct {
DialerKnobs ModuleTestingKnobs
ProtectedTS ModuleTestingKnobs
CapturedIndexUsageStatsKnobs ModuleTestingKnobs
AdmissionControlOptions ModuleTestingKnobs // TODO(irfansharif): Remove.
AdmissionControl ModuleTestingKnobs
RaftTransport ModuleTestingKnobs
UnusedIndexRecommendKnobs ModuleTestingKnobs
ExternalConnection ModuleTestingKnobs
EventExporter ModuleTestingKnobs
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -153,6 +154,8 @@ func TestIngest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 104639, "flaky test")

ctx := context.Background()
t.Run("batch=default", func(t *testing.T) {
runTestIngest(t, func(_ *cluster.Settings) {})
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ crdb_internal kv_catalog_descriptor table admin NULL NULL
crdb_internal kv_catalog_namespace table admin NULL NULL
crdb_internal kv_catalog_zones table admin NULL NULL
crdb_internal kv_dropped_relations view admin NULL NULL
crdb_internal kv_flow_control_handles table admin NULL NULL
crdb_internal kv_flow_controller table admin NULL NULL
crdb_internal kv_flow_token_deductions table admin NULL NULL
crdb_internal kv_inherited_role_members table admin NULL NULL
crdb_internal kv_node_liveness table admin NULL NULL
crdb_internal kv_node_status table admin NULL NULL
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ table_name NOT IN (
'kv_catalog_zones',
'kv_dropped_relations',
'kv_inherited_role_members',
'kv_flow_control_handles',
'kv_flow_controller',
'kv_flow_token_deductions',
'lost_descriptors_with_data',
'table_columns',
'table_row_statistics',
Expand Down
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ const (
// the system tenant.
V23_2_EnableRangeCoalescingForSystemTenant

// V23_2_UseACRaftEntryEntryEncodings gates the use of raft entry encodings
// that (optionally) embed below-raft admission data.
V23_2_UseACRaftEntryEntryEncodings

// *************************************************
// Step (1) Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -935,6 +939,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_2_EnableRangeCoalescingForSystemTenant,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 8},
},
{
Key: V23_2_UseACRaftEntryEntryEncodings,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 10},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
2 changes: 2 additions & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PROTOBUF_SRCS = [
"//pkg/geo/geoindex:geoindex_go_proto",
"//pkg/geo/geopb:geopb_go_proto",
"//pkg/gossip:gossip_go_proto",
"//pkg/inspectz/inspectzpb:inspectzpb_go_proto",
"//pkg/jobs/jobspb:jobspb_go_proto",
"//pkg/keyvisualizer/keyvispb:keyvispb_go_proto",
"//pkg/kv/bulk/bulkpb:bulkpb_go_proto",
Expand All @@ -34,6 +35,7 @@ PROTOBUF_SRCS = [
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
"//pkg/kv/kvserver/concurrency/poison:poison_go_proto",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb_go_proto",
"//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto",
Expand Down
22 changes: 22 additions & 0 deletions pkg/inspectz/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "inspectz",
srcs = [
"inspectz.go",
"unsupported.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/inspectz",
visibility = ["//visibility:public"],
deps = [
"//pkg/inspectz/inspectzpb",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/roachpb",
"//pkg/util/errorutil",
"//pkg/util/log",
],
)

get_x_data(name = "get_x_data")
151 changes: 151 additions & 0 deletions pkg/inspectz/inspectz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package inspectz

import (
"context"
"encoding/json"
"net/http"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// URLPrefix is the prefix for all inspectz endpoints hosted by the server.
const URLPrefix = "/inspectz/"

// Server is a concrete implementation of the InspectzServer interface,
// organizing everything under /inspectz/*. It's the top-level component that
// houses parsing logic for common inspectz URL parameters and maintains routing
// logic.
type Server struct {
log.AmbientContext

mux *http.ServeMux
handles kvflowcontrol.Handles
kvflowController kvflowcontrol.Controller
}

var _ inspectzpb.InspectzServer = &Server{}

// NewServer sets up an inspectz server.
func NewServer(
ambient log.AmbientContext,
handles kvflowcontrol.Handles,
kvflowController kvflowcontrol.Controller,
) *Server {
mux := http.NewServeMux()
server := &Server{
AmbientContext: ambient,

mux: mux,
handles: handles,
kvflowController: kvflowController,
}
mux.Handle("/inspectz/kvflowhandles", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.HandlesRequest{}
if rangeIDs, ok := parseRangeIDs(r.URL.Query().Get("ranges"), w); ok {
req.RangeIDs = rangeIDs
}
resp, err := server.KVFlowHandles(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))
mux.Handle("/inspectz/kvflowcontroller", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
ctx := server.AnnotateCtx(context.Background())

req := &kvflowinspectpb.ControllerRequest{}
resp, err := server.KVFlowController(ctx, req)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
respond(ctx, w, http.StatusOK, resp)
},
))

return server
}

// KVFlowController implements the InspectzServer interface.
func (s *Server) KVFlowController(
ctx context.Context, request *kvflowinspectpb.ControllerRequest,
) (*kvflowinspectpb.ControllerResponse, error) {
return &kvflowinspectpb.ControllerResponse{
Streams: s.kvflowController.Inspect(ctx),
}, nil
}

// KVFlowHandles implements the InspectzServer interface.
func (s *Server) KVFlowHandles(
ctx context.Context, request *kvflowinspectpb.HandlesRequest,
) (*kvflowinspectpb.HandlesResponse, error) {
resp := &kvflowinspectpb.HandlesResponse{}
if len(request.RangeIDs) == 0 {
request.RangeIDs = s.handles.Inspect()
}
for _, rangeID := range request.RangeIDs {
handle, found := s.handles.Lookup(rangeID)
if !found {
continue // nothing to do
}
resp.Handles = append(resp.Handles, handle.Inspect(ctx))
}
return resp, nil
}

// ServeHTTP serves various tools under the /debug endpoint.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
}

func respond(ctx context.Context, w http.ResponseWriter, code int, payload interface{}) {
res, err := json.Marshal(payload)
if err != nil {
log.ErrorfDepth(ctx, 1, "%s", err)
http.Error(w, "internal error: check logs for details", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_, _ = w.Write(res)
}

func parseRangeIDs(input string, w http.ResponseWriter) (ranges []roachpb.RangeID, ok bool) {
if len(input) == 0 {
return nil, true
}
for _, part := range strings.Split(input, ",") {
rangeID, err := strconv.ParseInt(part, 10, 64)
if err != nil {
http.Error(w, "invalid range id", http.StatusBadRequest)
return nil, false
}

ranges = append(ranges, roachpb.RangeID(rangeID))
}
return ranges, true
}
36 changes: 36 additions & 0 deletions pkg/inspectz/inspectzpb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
name = "inspectzpb_proto",
srcs = ["inspectz.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb:kvflowinspectpb_proto",
"@go_googleapis//google/api:annotations_proto",
],
)

go_proto_library(
name = "inspectzpb_go_proto",
compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_grpc_compiler"],
importpath = "github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb",
proto = ":inspectzpb_proto",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"@org_golang_google_genproto//googleapis/api/annotations:go_default_library",
],
)

go_library(
name = "inspectzpb",
embed = [":inspectzpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/inspectz/inspectzpb",
visibility = ["//visibility:public"],
)

get_x_data(name = "get_x_data")
Loading

0 comments on commit 40c8fe1

Please sign in to comment.