From f532d5a49d91752cdc30d43dda383299966cede1 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Wed, 6 Jul 2022 11:33:34 -0400 Subject: [PATCH 1/5] ui: persist stmt view selection in sql activity page Previously, the selection of viewing historical or active executions for statements and transactions tabs in the SQL activity page did not persist on tab change. This commit persists the selection between tab changes in the SQL activity page. Release note (ui change): In the SQL Activity Page, the selection to view historical or active executions will persist between tabs. --- .../src/views/sqlActivity/sqlActivityPage.tsx | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/ui/workspaces/db-console/src/views/sqlActivity/sqlActivityPage.tsx b/pkg/ui/workspaces/db-console/src/views/sqlActivity/sqlActivityPage.tsx index ce11f6861d32..35e634360f63 100644 --- a/pkg/ui/workspaces/db-console/src/views/sqlActivity/sqlActivityPage.tsx +++ b/pkg/ui/workspaces/db-console/src/views/sqlActivity/sqlActivityPage.tsx @@ -11,7 +11,7 @@ // All changes made on this file, should also be done on the equivalent // file on managed-service repo. -import React from "react"; +import React, { useState } from "react"; import Helmet from "react-helmet"; import { Tabs } from "antd"; import "antd/lib/tabs/style"; @@ -20,7 +20,7 @@ import TransactionsPageConnected from "src/views/transactions/transactionsPage"; import StatementsPageConnected from "src/views/statements/statementsPage"; import { commonStyles, util } from "@cockroachlabs/cluster-ui"; import { RouteComponentProps } from "react-router-dom"; -import { tabAttr } from "src/util/constants"; +import { tabAttr, viewAttr } from "src/util/constants"; const { TabPane } = Tabs; @@ -36,10 +36,23 @@ export const SQL_ACTIVITY_DEFAULT_TAB: SQLActivityTabType = const SQLActivityPage = (props: RouteComponentProps) => { const currentTab = util.queryByName(props.location, tabAttr) || SQLActivityTabType.Statements; + const currentView = util.queryByName(props.location, viewAttr); + const [restoreStmtsViewParam, setRestoreStmtsViewParam] = useState< + string | null + >(currentView); const onTabChange = (tabId: string): void => { + const params = new URLSearchParams({ tab: tabId }); + if (tabId === "Sessions") { + setRestoreStmtsViewParam(currentView); + } else if (currentView || restoreStmtsViewParam) { + // We want to persist the view (fingerprints or active executions) + // for statement and transactions pages, and also restore the value + // when coming from sessions tab. + params.set("view", currentView ?? restoreStmtsViewParam ?? ""); + } props.history.push({ - search: new URLSearchParams({ tab: tabId }).toString(), + search: params.toString(), }); }; From c3c1541330a6e975bfacaae9dd0795ac4ce41a0b Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Wed, 6 Jul 2022 11:09:59 -0400 Subject: [PATCH 2/5] ui/cluster-ui: filter out closed sessions from active exec pages Previously, it was possible for the active transactions page to show txns from closed sessions. The sessions API was recently updated to return closed sessions, and it is possible for the active_txn field in a closed session to be populated. This commit filters out the closed sessions when retrieving active transactions. Release note (bug fix): active transactions page no longer shows transactions from closed sessions --- .../activeStatementUtils.spec.ts | 21 +++++++++++++++++++ .../activeExecutions/activeStatementUtils.ts | 7 ++++++- .../cluster-ui/src/activeExecutions/types.ts | 3 +++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.spec.ts b/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.spec.ts index 5bc7b2df9fc5..584a5e836ed4 100644 --- a/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.spec.ts +++ b/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.spec.ts @@ -13,6 +13,7 @@ import { SessionsResponse, ActiveTransaction, ActiveStatement, + SessionStatusType, } from "./types"; import * as protos from "@cockroachlabs/crdb-protobuf-client"; import moment from "moment"; @@ -185,6 +186,14 @@ describe("test activeStatementUtils", () => { client_address: "clientAddress2", active_queries: activeQueries, }, + { + id: new Uint8Array(), + username: "foo", + status: SessionStatusType.CLOSED, + application_name: "application2", + client_address: "clientAddress2", + active_queries: activeQueries, + }, ], errors: [], internal_app_name_prefix: "", @@ -270,6 +279,15 @@ describe("test activeStatementUtils", () => { active_queries: [makeActiveQuery()], active_txn: txns[1], }, + { + id: new Uint8Array(), + username: "foo", + status: SessionStatusType.CLOSED, + application_name: "closed_application", + client_address: "clientAddress2", + active_queries: [makeActiveQuery()], + active_txn: txns[1], + }, ], errors: [], internal_app_name_prefix: "", @@ -281,6 +299,9 @@ describe("test activeStatementUtils", () => { LAST_UPDATED, ); + // Should filter out the txn from closed session. + expect(activeTransactions.length).toBe(2); + expect(activeTransactions.length).toBe(txns.length); activeTransactions.forEach((txn: ActiveTransaction, i) => { diff --git a/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.ts b/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.ts index baaedacf46e8..dd016bcd8b02 100644 --- a/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.ts +++ b/pkg/ui/workspaces/cluster-ui/src/activeExecutions/activeStatementUtils.ts @@ -17,6 +17,7 @@ import { ActiveStatementPhase, ExecutionStatus, ActiveTransactionFilters, + SessionStatusType, } from "./types"; import { ActiveStatement, ActiveStatementFilters } from "./types"; @@ -54,6 +55,7 @@ export function getActiveStatementsFromSessions( const time = lastUpdated || moment.utc(); sessionsResponse.sessions.forEach(session => { + if (session.status === SessionStatusType.CLOSED) return; session.active_queries.forEach(query => { activeQueries.push({ executionID: query.id, @@ -140,7 +142,10 @@ export function getActiveTransactionsFromSessions( }); return sessionsResponse.sessions - .filter(session => session.active_txn) + .filter( + session => + session.status !== SessionStatusType.CLOSED && session.active_txn, + ) .map(session => { const activeTxn = session.active_txn; diff --git a/pkg/ui/workspaces/cluster-ui/src/activeExecutions/types.ts b/pkg/ui/workspaces/cluster-ui/src/activeExecutions/types.ts index 04c34972f612..c34901d9bc5d 100644 --- a/pkg/ui/workspaces/cluster-ui/src/activeExecutions/types.ts +++ b/pkg/ui/workspaces/cluster-ui/src/activeExecutions/types.ts @@ -17,8 +17,11 @@ export type SessionsResponse = export type ActiveStatementResponse = protos.cockroach.server.serverpb.ActiveQuery; export type ExecutionStatus = "Waiting" | "Executing" | "Preparing"; + export const ActiveStatementPhase = protos.cockroach.server.serverpb.ActiveQuery.Phase; +export const SessionStatusType = + protos.cockroach.server.serverpb.Session.Status; export type ActiveStatement = { executionID: string; From f422badd1fb5e9b3899bc07d9a3a43cb8ef0af11 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 28 Jun 2022 14:20:31 -0700 Subject: [PATCH 3/5] stmtdiagnostics: remove the usage of gossip This commit removes the usage of gossip in the stmt diagnostics feature since it is really an optimization (i.e not required for the feature to work) and is in a way of the architecture unification (i.e. multi-tenancy effort). The gossip was previously used to let all nodes in the cluster know about the new stmt diagnostics request or about the cancellation of an existing one (in fact, the gossip propagation of the latter was broken anyway since we forgot to register the corresponding callback). We now rely on the polling mechanism on each node to read from the system table to populate the registry of current requests. This polling takes place every `sql.stmt_diagnostics.poll_interval` seconds (10 by default). Additionally, this commit changes the class of that setting from TenantWritable to TenantReadOnly because 1. we don't charge the user for the polling 2. to prevent the abuse where a tenant could set the polling interval to a very low value incurring no costs to themselves (we excluded the polling from the tenant cost recently). The follow-up work remains to bring the optimization of propagating new requests quickly using a range feed on the system table. Release note: None --- pkg/gossip/keys.go | 14 --- pkg/server/server_sql.go | 1 - pkg/sql/conn_executor_internal_test.go | 2 +- pkg/sql/stmtdiagnostics/BUILD.bazel | 2 - .../stmtdiagnostics/statement_diagnostics.go | 107 ++---------------- .../statement_diagnostics_test.go | 4 + 6 files changed, 14 insertions(+), 116 deletions(-) diff --git a/pkg/gossip/keys.go b/pkg/gossip/keys.go index 5388a4476f9e..34a2a5a53300 100644 --- a/pkg/gossip/keys.go +++ b/pkg/gossip/keys.go @@ -88,20 +88,6 @@ const ( // client connections a node has open. This is used by other nodes in the // cluster to build a map of the gossip network. KeyGossipClientsPrefix = "gossip-clients" - - // KeyGossipStatementDiagnosticsRequest is the gossip key for new statement - // diagnostics requests. The values is the id of the request that generated - // the notification, as a little-endian-encoded uint64. - // stmtDiagnosticsRequestRegistry listens for notifications and responds by - // polling for new requests. - KeyGossipStatementDiagnosticsRequest = "stmt-diag-req" - - // KeyGossipStatementDiagnosticsRequestCancellation is the gossip key for - // canceling an existing diagnostics request. The values is the id of the - // request that needs to be canceled, as a little-endian-encoded uint64. - // stmtDiagnosticsRequestRegistry listens for notifications and responds by - // polling for new requests. - KeyGossipStatementDiagnosticsRequestCancellation = "stmt-diag-cancel-req" ) // MakeKey creates a canonical key under which to gossip a piece of diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 8940ab9e4d0c..a5b6b2938934 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -953,7 +953,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry( cfg.circularInternalExecutor, cfg.db, - cfg.gossip, cfg.Settings, ) execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 29af67f84164..7454ea246b31 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -330,7 +330,7 @@ func startConnExecutor( ), QueryCache: querycache.New(0), TestingKnobs: ExecutorTestingKnobs{}, - StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, gw, st), + StmtDiagnosticsRecorder: stmtdiagnostics.NewRegistry(nil, nil, st), HistogramWindowInterval: base.DefaultHistogramWindowInterval(), CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec), } diff --git a/pkg/sql/stmtdiagnostics/BUILD.bazel b/pkg/sql/stmtdiagnostics/BUILD.bazel index 357491207ce3..8c506684964e 100644 --- a/pkg/sql/stmtdiagnostics/BUILD.bazel +++ b/pkg/sql/stmtdiagnostics/BUILD.bazel @@ -7,10 +7,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", - "//pkg/gossip", "//pkg/kv", "//pkg/multitenant", - "//pkg/roachpb", "//pkg/security/username", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index ac4178b4aef7..a6843cb15049 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -12,16 +12,13 @@ package stmtdiagnostics import ( "context" - "encoding/binary" "fmt" "math/rand" "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/multitenant" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -38,7 +35,7 @@ import ( ) var pollingInterval = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.TenantReadOnly, "sql.stmt_diagnostics.poll_interval", "rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable", 10*time.Second) @@ -80,19 +77,9 @@ type Registry struct { rand *rand.Rand } - st *cluster.Settings - ie sqlutil.InternalExecutor - db *kv.DB - gossip gossip.OptionalGossip - - // gossipUpdateChan is used to notify the polling loop that a diagnostics - // request has been added. The gossip callback will not block sending on this - // channel. - gossipUpdateChan chan RequestID - // gossipCancelChan is used to notify the polling loop that a diagnostics - // request has been canceled. The gossip callback will not block sending on - // this channel. - gossipCancelChan chan RequestID + st *cluster.Settings + ie sqlutil.InternalExecutor + db *kv.DB } // Request describes a statement diagnostics request along with some conditional @@ -113,25 +100,13 @@ func (r *Request) isConditional() bool { } // NewRegistry constructs a new Registry. -func NewRegistry( - ie sqlutil.InternalExecutor, db *kv.DB, gw gossip.OptionalGossip, st *cluster.Settings, -) *Registry { +func NewRegistry(ie sqlutil.InternalExecutor, db *kv.DB, st *cluster.Settings) *Registry { r := &Registry{ - ie: ie, - db: db, - gossip: gw, - gossipUpdateChan: make(chan RequestID, 1), - gossipCancelChan: make(chan RequestID, 1), - st: st, + ie: ie, + db: db, + st: st, } r.mu.rand = rand.New(rand.NewSource(timeutil.Now().UnixNano())) - - // Some tests pass a nil gossip, and gossip is not available on SQL tenant - // servers. - g, ok := gw.Optional(47893) - if ok && g != nil { - g.RegisterCallback(gossip.KeyGossipStatementDiagnosticsRequest, r.gossipNotification) - } return r } @@ -187,17 +162,6 @@ func (r *Registry) poll(ctx context.Context) { select { case <-pollIntervalChanged: continue // go back around and maybe reset the timer - case reqID := <-r.gossipUpdateChan: - if r.findRequest(reqID) { - continue // request already exists, don't do anything - } - // Poll the data. - case reqID := <-r.gossipCancelChan: - r.cancelRequest(reqID) - // No need to poll the data (unlike above) because we don't have to - // read anything of the system table to remove the request from the - // registry. - continue case <-timer.C: timer.Read = true case <-ctx.Done(): @@ -241,12 +205,6 @@ func (r *Registry) addRequestInternalLocked( } } -func (r *Registry) findRequest(requestID RequestID) bool { - r.mu.Lock() - defer r.mu.Unlock() - return r.findRequestLocked(requestID) -} - // findRequestLocked returns whether the request already exists. If the request // is not ongoing and has already expired, it is removed from the registry (yet // true is still returned). @@ -291,11 +249,6 @@ func (r *Registry) insertRequestInternal( minExecutionLatency time.Duration, expiresAfter time.Duration, ) (RequestID, error) { - g, err := r.gossip.OptionalErr(48274) - if err != nil { - return 0, err - } - isSamplingProbabilitySupported := r.st.Version.IsActive(ctx, clusterversion.SampledStmtDiagReqs) if !isSamplingProbabilitySupported && samplingProbability != 0 { return 0, errors.New( @@ -315,7 +268,7 @@ func (r *Registry) insertRequestInternal( var reqID RequestID var expiresAt time.Time - err = r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Check if there's already a pending request for this fingerprint. row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn, sessiondata.InternalExecutorOverride{ @@ -393,23 +346,11 @@ func (r *Registry) insertRequestInternal( r.addRequestInternalLocked(ctx, reqID, stmtFingerprint, samplingProbability, minExecutionLatency, expiresAt) }() - // Notify all the other nodes that they have to poll. - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(reqID)) - if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequest, buf, 0 /* ttl */); err != nil { - log.Warningf(ctx, "error notifying of diagnostics request: %s", err) - } - return reqID, nil } // CancelRequest is part of the server.StmtDiagnosticsRequester interface. func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error { - g, err := r.gossip.OptionalErr(48274) - if err != nil { - return err - } - row, err := r.ie.QueryRowEx(ctx, "stmt-diag-cancel-request", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), @@ -435,13 +376,6 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error { reqID := RequestID(requestID) r.cancelRequest(reqID) - // Notify all the other nodes that this request has been canceled. - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, uint64(reqID)) - if err := g.AddInfo(gossip.KeyGossipStatementDiagnosticsRequestCancellation, buf, 0 /* ttl */); err != nil { - log.Warningf(ctx, "error notifying of diagnostics request cancellation: %s", err) - } - return nil } @@ -732,26 +666,3 @@ func (r *Registry) pollRequests(ctx context.Context) error { } return nil } - -// gossipNotification is called in response to a gossip update informing us that -// we need to poll. -func (r *Registry) gossipNotification(s string, value roachpb.Value) { - switch s { - case gossip.KeyGossipStatementDiagnosticsRequest: - select { - case r.gossipUpdateChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)): - default: - // Don't pile up on these requests and don't block gossip. - } - case gossip.KeyGossipStatementDiagnosticsRequestCancellation: - select { - case r.gossipCancelChan <- RequestID(binary.LittleEndian.Uint64(value.RawBytes)): - default: - // Don't pile up on these requests and don't block gossip. - } - default: - // We don't expect any other notifications. Perhaps in a future version - // we added other keys with the same prefix. - return - } -} diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go index 32c49cef8ea8..f129c1a51411 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics_test.go @@ -317,6 +317,10 @@ func TestDiagnosticsRequestDifferentNode(t *testing.T) { _, err := db0.Exec("CREATE TABLE test (x int PRIMARY KEY)") require.NoError(t, err) + // Lower the polling interval to speed up the test. + _, err = db0.Exec("SET CLUSTER SETTING sql.stmt_diagnostics.poll_interval = '1ms'") + require.NoError(t, err) + var minExecutionLatency, expiresAfter time.Duration var samplingProbability float64 From 35e502e95bcac1ad26118a36469d82dccb158f5b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 6 Jul 2022 09:11:15 -0700 Subject: [PATCH 4/5] kvstreamer: introduce a single range fast path for request truncation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces a fast path to avoid the usage of the batch truncation helper when all requests are contained within a single range. Some modifications needed to be made to the `txnKVStreamer` so that it didn't nil out the requests slice - we now delay that until right before the next call to `Enqueue`. ``` ame old time/op new time/op delta IndexJoin/Cockroach-24 6.21ms ± 1% 5.96ms ± 2% -4.08% (p=0.000 n=8+10) IndexJoinColumnFamilies/Cockroach-24 8.97ms ± 4% 8.79ms ± 7% ~ (p=0.190 n=10+10) name old alloc/op new alloc/op delta IndexJoin/Cockroach-24 1.39MB ± 1% 1.27MB ± 1% -7.97% (p=0.000 n=10+10) IndexJoinColumnFamilies/Cockroach-24 1.46MB ± 1% 1.34MB ± 0% -8.04% (p=0.000 n=9+7) name old allocs/op new allocs/op delta IndexJoin/Cockroach-24 7.20k ± 1% 7.16k ± 1% -0.61% (p=0.022 n=10+10) IndexJoinColumnFamilies/Cockroach-24 12.0k ± 1% 11.9k ± 0% -0.83% (p=0.000 n=9+8) ``` Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 60 ++++++++++++++++---------- pkg/sql/row/kv_batch_streamer.go | 26 +++++++---- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index dfe7899cbd94..dae4335da481 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -503,35 +503,51 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re s.mu.Unlock() } }() - // TODO(yuzefovich): reuse truncation helpers between different Enqueue() - // calls. - // TODO(yuzefovich): introduce a fast path when all requests are contained - // within a single range. - // The streamer can process the responses in an arbitrary order, so we don't - // require the helper to preserve the order of requests and allow it to - // reorder the reqs slice too. - const mustPreserveOrder = false - const canReorderRequestsSlice = true - truncationHelper, err := kvcoord.MakeBatchTruncationHelper( - scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, - ) - if err != nil { - return err + allRequestsAreWithinSingleRange := !ri.NeedAnother(rs) + var truncationHelper kvcoord.BatchTruncationHelper + if !allRequestsAreWithinSingleRange { + // We only need the truncation helper if the requests span multiple + // ranges. + // + // The streamer can process the responses in an arbitrary order, so we + // don't require the helper to preserve the order of requests and allow + // it to reorder the reqs slice too. + const mustPreserveOrder = false + const canReorderRequestsSlice = true + // TODO(yuzefovich): reuse truncation helpers between different + // Enqueue() calls. + truncationHelper, err = kvcoord.MakeBatchTruncationHelper( + scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, + ) + if err != nil { + return err + } } var reqsKeysScratch []roachpb.Key var newNumRangesPerScanRequestMemoryUsage int64 for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) { - // Truncate the request span to the current range. - singleRangeSpan, err := rs.Intersect(ri.Token().Desc()) - if err != nil { - return err - } // Find all requests that touch the current range. var singleRangeReqs []roachpb.RequestUnion var positions []int - singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan) - if err != nil { - return err + if allRequestsAreWithinSingleRange { + // All requests are within this range, so we can just use the + // enqueued requests directly. + singleRangeReqs = reqs + positions = make([]int, len(reqs)) + for i := range positions { + positions[i] = i + } + seekKey = roachpb.RKeyMax + } else { + // Truncate the request span to the current range. + singleRangeSpan, err := rs.Intersect(ri.Token().Desc()) + if err != nil { + return err + } + singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan) + if err != nil { + return err + } } rs.Key = seekKey var subRequestIdx []int32 diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index d3451972d0fd..63ae2222504b 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -91,20 +91,30 @@ func (f *txnKVStreamer) SetupNextFetch( if log.ExpensiveLogEnabled(ctx, 2) { log.VEventf(ctx, 2, "Scan %s", spans) } - reqs := spansToRequests(spans, false /* reverse */, f.keyLocking, f.reqsScratch) + // Make sure to nil out the requests past the length that will be used in + // spansToRequests so that we lose references to the underlying Get and Scan + // requests (which could keep large byte slices alive) from the previous + // iteration. + // + // Note that we could not do this nil-ing out after Enqueue() returned on + // the previous iteration because in some cases the streamer will hold on to + // the slice (which is the case when the requests are contained within a + // single range). At the same time we don't want to push the responsibility + // of nil-ing the slice out because we (i.e. the txnKVStreamer) are the ones + // that keep the slice for reuse, and the streamer doesn't know anything + // about the slice reuse. + reqsScratch := f.reqsScratch[:cap(f.reqsScratch)] + for i := len(spans); i < len(reqsScratch); i++ { + reqsScratch[i] = roachpb.RequestUnion{} + } + reqs := spansToRequests(spans, false /* reverse */, f.keyLocking, reqsScratch) if err := f.streamer.Enqueue(ctx, reqs); err != nil { return err } f.spans = spans f.spanIDs = spanIDs - // Keep the reference to the requests slice in order to reuse in the future - // after making sure to nil out the requests in order to lose references to - // the underlying Get and Scan requests which could keep large byte slices - // alive. + // Keep the reference to the requests slice in order to reuse in the future. f.reqsScratch = reqs - for i := range f.reqsScratch { - f.reqsScratch[i] = roachpb.RequestUnion{} - } reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch)) return f.acc.ResizeTo(ctx, reqsScratchMemUsage) } From 75a18a86e74d7f996011881203edf8107b3decbc Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 6 Jul 2022 11:34:46 -0700 Subject: [PATCH 5/5] kvcoord: refactor the truncation helper for reuse This commit refactors the batch truncation helper so that it can be reused for multiple batches of requests. In particular, that ability is now utilized by the streamer. Additionally, since the streamer now holds on to the same truncation helper for possibly a long time, this commit adds the memory accounting for the internal state of the helper. Release note: None --- pkg/kv/kvclient/kvcoord/batch.go | 166 +++++++++++++++++++------ pkg/kv/kvclient/kvcoord/batch_test.go | 12 +- pkg/kv/kvclient/kvcoord/dist_sender.go | 2 +- pkg/kv/kvclient/kvstreamer/streamer.go | 37 ++++-- 4 files changed, 162 insertions(+), 55 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/batch.go b/pkg/kv/kvclient/kvcoord/batch.go index a70006af3873..100721d0701f 100644 --- a/pkg/kv/kvclient/kvcoord/batch.go +++ b/pkg/kv/kvclient/kvcoord/batch.go @@ -12,6 +12,7 @@ package kvcoord import ( "sort" + "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -34,7 +35,7 @@ import ( // // All requests fit within a single range, don't use the helper. // ... // } -// helper := MakeBatchTruncationHelper(scanDir, requests) +// helper := NewBatchTruncationHelper(scanDir, requests) // for ri.Valid() { // curRangeRS := rs.Intersect(ri.Token().Desc()) // curRangeReqs, positions, seekKey := helper.Truncate(curRangeRS) @@ -80,9 +81,21 @@ type BatchTruncationHelper struct { // requests are the original requests this helper needs to process (possibly // in non-original order). requests []roachpb.RequestUnion + // ownRequestsSlice indicates whether a separate slice was allocated for + // requests. It is used for the purposes of the memory accounting. + // + // It is the same as !canReorderRequestsSlice in most cases, except for when + // the local keys are present. In such a scenario, even if + // canReorderRequestsSlice is false, ownRequestsSlice might remain false. + ownRequestsSlice bool // mustPreserveOrder indicates whether the requests must be returned by // Truncate() in the original order. mustPreserveOrder bool + // canReorderRequestsSlice indicates whether the helper will hold on to the + // given slice of requests and might reorder the requests within it + // (although each request will not be modified "deeply" - i.e. its header + // won't be updated or anything like that). + canReorderRequestsSlice bool // foundLocalKey, if true, indicates whether some of the requests reference // the local keys. When true, the helper falls back to the legacy methods. foundLocalKey bool @@ -154,8 +167,9 @@ func (h descBatchTruncationHelper) Less(i, j int) bool { return h.headers[i].EndKey.Compare(h.headers[j].EndKey) > 0 } -// MakeBatchTruncationHelper returns a new BatchTruncationHelper for the given -// requests. +// NewBatchTruncationHelper returns a new BatchTruncationHelper for the given +// requests. The helper can be reused later for a different set of requests via +// a separate Init() call. // // mustPreserveOrder, if true, indicates that the caller requires that requests // are returned by Truncate() in the original order (i.e. with strictly @@ -166,67 +180,141 @@ func (h descBatchTruncationHelper) Less(i, j int) bool { // not be modified "deeply" - i.e. its header won't be updated or anything like // that). Set it to false when the caller cares about the slice not being // mutated in any way. -func MakeBatchTruncationHelper( +func NewBatchTruncationHelper( scanDir ScanDirection, requests []roachpb.RequestUnion, mustPreserveOrder bool, canReorderRequestsSlice bool, -) (BatchTruncationHelper, error) { - var ret BatchTruncationHelper - ret.scanDir = scanDir - ret.requests = requests - ret.mustPreserveOrder = mustPreserveOrder +) (*BatchTruncationHelper, error) { + ret := &BatchTruncationHelper{ + scanDir: scanDir, + mustPreserveOrder: mustPreserveOrder, + canReorderRequestsSlice: canReorderRequestsSlice, + } + return ret, ret.Init(requests) +} + +// Init sets up the helper for the provided requests. It can be called multiple +// times, and it will reuse as much internal allocations as possible. +func (h *BatchTruncationHelper) Init(requests []roachpb.RequestUnion) error { // Determine whether we can use the optimized strategy before making any // allocations. + h.foundLocalKey = false for i := range requests { header := requests[i].GetInner().Header() if keys.IsLocal(header.Key) { - ret.foundLocalKey = true - return ret, nil + h.requests = requests + h.foundLocalKey = true + return nil } } // We can use the optimized strategy, so set up all of the internal state. - if !canReorderRequestsSlice { + h.startIdx = 0 + if h.canReorderRequestsSlice { + h.requests = requests + } else { // If we can't reorder the original requests slice, we must make a copy. - ret.requests = make([]roachpb.RequestUnion, len(requests)) - copy(ret.requests, requests) + if cap(h.requests) < len(requests) { + h.requests = make([]roachpb.RequestUnion, len(requests)) + h.ownRequestsSlice = true + } else { + if len(requests) < len(h.requests) { + // Ensure that we lose references to the old requests that will + // not be overwritten by copy. + // + // Note that we only need to go up to the number of old requests + // and not the capacity of the slice since we assume that + // everything past the length is already nil-ed out. + oldRequests := h.requests[len(requests):len(h.requests)] + for i := range oldRequests { + oldRequests[i] = roachpb.RequestUnion{} + } + } + h.requests = h.requests[:len(requests)] + } + copy(h.requests, requests) + } + if cap(h.headers) < len(requests) { + h.headers = make([]roachpb.RequestHeader, len(requests)) + } else { + if len(requests) < len(h.headers) { + // Ensure that we lose references to the old header that will + // not be overwritten in the loop below. + // + // Note that we only need to go up to the number of old headers and + // not the capacity of the slice since we assume that everything + // past the length is already nil-ed out. + oldHeaders := h.headers[len(requests):len(h.headers)] + for i := range oldHeaders { + oldHeaders[i] = roachpb.RequestHeader{} + } + } + h.headers = h.headers[:len(requests)] + } + if cap(h.positions) < len(requests) { + h.positions = make([]int, len(requests)) + } else { + h.positions = h.positions[:len(requests)] + } + if cap(h.isRange) < len(requests) { + h.isRange = make([]bool, len(requests)) + } else { + h.isRange = h.isRange[:len(requests)] } - ret.headers = make([]roachpb.RequestHeader, len(requests)) - ret.positions = make([]int, len(requests)) - ret.isRange = make([]bool, len(requests)) // Populate the internal state as well as perform some sanity checks on the // requests. for i := range requests { req := requests[i].GetInner() - ret.headers[i] = req.Header() - ret.positions[i] = i - ret.isRange[i] = roachpb.IsRange(req) - if ret.isRange[i] { + h.headers[i] = req.Header() + h.positions[i] = i + h.isRange[i] = roachpb.IsRange(req) + if h.isRange[i] { // We're dealing with a range-spanning request. - if l, r := keys.IsLocal(ret.headers[i].Key), keys.IsLocal(ret.headers[i].EndKey); (l && !r) || (!l && r) { - return BatchTruncationHelper{}, errors.AssertionFailedf("local key mixed with global key in range") + if l, r := keys.IsLocal(h.headers[i].Key), keys.IsLocal(h.headers[i].EndKey); (l && !r) || (!l && r) { + return errors.AssertionFailedf("local key mixed with global key in range") } - } else if len(ret.headers[i].EndKey) > 0 { - return BatchTruncationHelper{}, errors.AssertionFailedf("%T is not a range command, but EndKey is set", req) + } else if len(h.headers[i].EndKey) > 0 { + return errors.AssertionFailedf("%T is not a range command, but EndKey is set", req) } } - if scanDir == Ascending { - sort.Sort(ascBatchTruncationHelper{BatchTruncationHelper: &ret}) + if h.scanDir == Ascending { + sort.Sort(ascBatchTruncationHelper{BatchTruncationHelper: h}) } else { // With the Descending scan direction, we have to convert all point // requests into range-spanning requests that include only a single // point. - for i := range ret.headers { - if len(ret.headers[i].EndKey) == 0 { - ret.headers[i].EndKey = ret.headers[i].Key.Next() + for i := range h.headers { + if len(h.headers[i].EndKey) == 0 { + h.headers[i].EndKey = h.headers[i].Key.Next() } } - sort.Sort(descBatchTruncationHelper{BatchTruncationHelper: &ret}) + sort.Sort(descBatchTruncationHelper{BatchTruncationHelper: h}) } - if ret.mustPreserveOrder { - ret.helper.init(len(requests)) + if h.mustPreserveOrder { + h.helper.init(len(requests)) } - return ret, nil + return nil +} + +const ( + requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{})) + requestHeaderOverhead = int64(unsafe.Sizeof(roachpb.RequestHeader{})) + intOverhead = int64(unsafe.Sizeof(int(0))) + boolOverhead = int64(unsafe.Sizeof(false)) +) + +// MemUsage returns the memory usage of the internal state of the helper. +func (h *BatchTruncationHelper) MemUsage() int64 { + var memUsage int64 + if h.ownRequestsSlice { + // Only account for the requests slice if we own it. + memUsage += int64(cap(h.requests)) * requestUnionOverhead + } + memUsage += int64(cap(h.headers)) * requestHeaderOverhead + memUsage += int64(cap(h.positions)) * intOverhead + memUsage += int64(cap(h.isRange)) * boolOverhead + memUsage += h.helper.memUsage() + return memUsage } // Truncate restricts all requests to the given key range and returns new, @@ -906,12 +994,20 @@ type orderRestorationHelper struct { } func (h *orderRestorationHelper) init(numOriginalRequests int) { - h.found = make([]int, numOriginalRequests) + if cap(h.found) < numOriginalRequests { + h.found = make([]int, numOriginalRequests) + } else { + h.found = h.found[:numOriginalRequests] + } for i := range h.found { h.found[i] = -1 } } +func (h *orderRestorationHelper) memUsage() int64 { + return int64(cap(h.scratch))*requestUnionOverhead + int64(cap(h.found))*intOverhead +} + // restoreOrder reorders truncReqs in the ascending order of the corresponding // positions values. // diff --git a/pkg/kv/kvclient/kvcoord/batch_test.go b/pkg/kv/kvclient/kvcoord/batch_test.go index 4227a8d0699b..76dce95f7891 100644 --- a/pkg/kv/kvclient/kvcoord/batch_test.go +++ b/pkg/kv/kvclient/kvcoord/batch_test.go @@ -210,9 +210,9 @@ func TestBatchPrevNext(t *testing.T) { } const mustPreserveOrder = false const canReorderRequestsSlice = false - ascHelper, err := MakeBatchTruncationHelper(Ascending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice) + ascHelper, err := NewBatchTruncationHelper(Ascending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice) require.NoError(t, err) - descHelper, err := MakeBatchTruncationHelper(Descending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice) + descHelper, err := NewBatchTruncationHelper(Descending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice) require.NoError(t, err) if _, _, next, err := ascHelper.Truncate( roachpb.RSpan{ @@ -396,10 +396,10 @@ func TestTruncate(t *testing.T) { original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) } - var truncationHelper BatchTruncationHelper + var truncationHelper *BatchTruncationHelper if !isLegacy { var err error - truncationHelper, err = MakeBatchTruncationHelper( + truncationHelper, err = NewBatchTruncationHelper( Ascending, original.Requests, mustPreserveOrder, canReorderRequestsSlice, ) if err != nil { @@ -562,7 +562,7 @@ func TestTruncateLoop(t *testing.T) { for _, mustPreserveOrder := range []bool{false, true} { t.Run(fmt.Sprintf("run=%d/%s/order=%t", numRuns, scanDir, mustPreserveOrder), func(t *testing.T) { const canReorderRequestsSlice = false - helper, err := MakeBatchTruncationHelper( + helper, err := NewBatchTruncationHelper( scanDir, requests, mustPreserveOrder, canReorderRequestsSlice, ) require.NoError(t, err) @@ -698,7 +698,7 @@ func BenchmarkTruncateLoop(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { const canReorderRequestsSlice = false - h, err := MakeBatchTruncationHelper( + h, err := NewBatchTruncationHelper( scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, ) require.NoError(b, err) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 0e3d0d4e28f9..7d3fcd8964c6 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1339,7 +1339,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( // TODO(yuzefovich): refactor the DistSender so that the truncation helper // could reorder requests as it pleases. const canReorderRequestsSlice = false - truncationHelper, err := MakeBatchTruncationHelper( + truncationHelper, err := NewBatchTruncationHelper( scanDir, ba.Requests, mustPreserveOrder, canReorderRequestsSlice, ) if err != nil { diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index dae4335da481..d33c54a6e378 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -230,6 +230,12 @@ type Streamer struct { waitGroup sync.WaitGroup + truncationHelper *kvcoord.BatchTruncationHelper + // truncationHelperAccountedFor tracks how much space has been consumed from + // the budget in order to account for the memory usage of the truncation + // helper. + truncationHelperAccountedFor int64 + // requestsToServe contains all single-range sub-requests that have yet // to be served. requestsToServe requestsProvider @@ -504,21 +510,21 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re } }() allRequestsAreWithinSingleRange := !ri.NeedAnother(rs) - var truncationHelper kvcoord.BatchTruncationHelper if !allRequestsAreWithinSingleRange { // We only need the truncation helper if the requests span multiple // ranges. - // - // The streamer can process the responses in an arbitrary order, so we - // don't require the helper to preserve the order of requests and allow - // it to reorder the reqs slice too. - const mustPreserveOrder = false - const canReorderRequestsSlice = true - // TODO(yuzefovich): reuse truncation helpers between different - // Enqueue() calls. - truncationHelper, err = kvcoord.MakeBatchTruncationHelper( - scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, - ) + if s.truncationHelper == nil { + // The streamer can process the responses in an arbitrary order, so + // we don't require the helper to preserve the order of requests and + // allow it to reorder the reqs slice too. + const mustPreserveOrder = false + const canReorderRequestsSlice = true + s.truncationHelper, err = kvcoord.NewBatchTruncationHelper( + scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, + ) + } else { + err = s.truncationHelper.Init(reqs) + } if err != nil { return err } @@ -544,7 +550,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re if err != nil { return err } - singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan) + singleRangeReqs, positions, seekKey, err = s.truncationHelper.Truncate(singleRangeSpan) if err != nil { return err } @@ -634,6 +640,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re } toConsume := totalReqsMemUsage + if !allRequestsAreWithinSingleRange { + accountedFor := s.truncationHelperAccountedFor + s.truncationHelperAccountedFor = s.truncationHelper.MemUsage() + toConsume += s.truncationHelperAccountedFor - accountedFor + } if newNumRangesPerScanRequestMemoryUsage != 0 && newNumRangesPerScanRequestMemoryUsage != s.numRangesPerScanRequestAccountedFor { toConsume += newNumRangesPerScanRequestMemoryUsage - s.numRangesPerScanRequestAccountedFor s.numRangesPerScanRequestAccountedFor = newNumRangesPerScanRequestMemoryUsage