From cb938ed462f96ecb74f55c6cdd8f9ea91eb44edb Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Fri, 4 Nov 2022 17:47:15 -0400 Subject: [PATCH 1/5] tenant: add apiv2 support for sql-over-http This commit adds partial support for the `/api/v2` HTTP server on tenants. Currently, we *only* support the SQL endpoint since this functionality is needed by newer DB Console features. The API V2 Server initialization no longer relies on the global `Server` object, and only requires a `SQLServer` instead which makes it easier to play nicely with tenants. However, we still have the problem of incompatible status and admin servers which are in use on other endpoints. Future work will unify the tenant scoped versions of these servers and allow them to be used here, enabling full API V2 compatibility on tenants. Release note (ops change): sql tenants now support the HTTP endpoint under `/api/v2/sql` which allows the caller to execute an HTTP request containing SQL statements to execute. The JSON response contains the results. This endpoints works identically as on a non-tenant server, except that it naturally scopes to the target tenant for SQL execution. Epic: CRDB-17356 --- pkg/ccl/serverccl/BUILD.bazel | 3 + pkg/ccl/serverccl/api_v2_tenant_test.go | 88 +++++ pkg/ccl/serverccl/tenant_test_utils.go | 4 + pkg/ccl/serverccl/testdata/api_v2_sql | 500 ++++++++++++++++++++++++ pkg/server/api_v2.go | 79 ++-- pkg/server/api_v2_auth.go | 7 +- pkg/server/api_v2_sql.go | 18 +- pkg/server/api_v2_sql_test.go | 4 +- pkg/server/server.go | 21 +- pkg/server/tenant.go | 8 +- pkg/util/httputil/http.go | 19 + 11 files changed, 699 insertions(+), 52 deletions(-) create mode 100644 pkg/ccl/serverccl/api_v2_tenant_test.go create mode 100644 pkg/ccl/serverccl/testdata/api_v2_sql diff --git a/pkg/ccl/serverccl/BUILD.bazel b/pkg/ccl/serverccl/BUILD.bazel index 3bdfc9d41c9c..6c0a7538ace5 100644 --- a/pkg/ccl/serverccl/BUILD.bazel +++ b/pkg/ccl/serverccl/BUILD.bazel @@ -33,6 +33,7 @@ go_test( size = "enormous", srcs = [ "admin_test.go", + "api_v2_tenant_test.go", "chart_catalog_test.go", "main_test.go", "role_authentication_test.go", @@ -41,6 +42,7 @@ go_test( "tenant_vars_test.go", ], args = ["-test.timeout=3595s"], + data = glob(["testdata/**"]), embed = [":serverccl"], deps = [ "//pkg/base", @@ -78,6 +80,7 @@ go_test( "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/timeutil", + "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_elastic_gosigar//:gosigar", "@com_github_lib_pq//:pq", "@com_github_prometheus_client_model//go", diff --git a/pkg/ccl/serverccl/api_v2_tenant_test.go b/pkg/ccl/serverccl/api_v2_tenant_test.go new file mode 100644 index 000000000000..4b83dd45fd34 --- /dev/null +++ b/pkg/ccl/serverccl/api_v2_tenant_test.go @@ -0,0 +1,88 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package serverccl + +import ( + "context" + "encoding/json" + "fmt" + "io" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestExecSQL(t *testing.T) { + defer leaktest.AfterTest(t)() + server.SQLAPIClock = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) + defer func() { + server.SQLAPIClock = timeutil.DefaultTimeSource{} + }() + + ctx := context.Background() + + testHelper := NewTestTenantHelper(t, 3 /* tenantClusterSize */, base.TestingKnobs{}) + defer testHelper.Cleanup(ctx, t) + + tenantCluster := testHelper.TestCluster() + adminClient := tenantCluster.TenantAdminHTTPClient(t, 0) + nonAdminClient := tenantCluster.TenantHTTPClient(t, 0, false) + + datadriven.RunTest(t, "testdata/api_v2_sql", + func(t *testing.T, d *datadriven.TestData) string { + if d.Cmd != "sql" { + t.Fatal("Only sql command is accepted in this test") + } + + var client *httpClient + if d.HasArg("admin") { + client = adminClient + } + if d.HasArg("non-admin") { + client = nonAdminClient + } + + resp, err := client.PostJSONRawChecked( + "/api/v2/sql/", + []byte(d.Input), + ) + require.NoError(t, err) + defer resp.Body.Close() + + r, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + if d.HasArg("expect-error") { + type jsonError struct { + Code string `json:"code"` + Message string `json:"message"` + } + type errorResp struct { + Error jsonError `json:"error"` + } + + er := errorResp{} + err := json.Unmarshal(r, &er) + require.NoError(t, err) + return fmt.Sprintf("%s|%s", er.Error.Code, er.Error.Message) + } + var u interface{} + err = json.Unmarshal(r, &u) + require.NoError(t, err) + s, err := json.MarshalIndent(u, "", " ") + require.NoError(t, err) + return string(s) + }, + ) +} diff --git a/pkg/ccl/serverccl/tenant_test_utils.go b/pkg/ccl/serverccl/tenant_test_utils.go index df44cbaece3e..1cf5acc6f499 100644 --- a/pkg/ccl/serverccl/tenant_test_utils.go +++ b/pkg/ccl/serverccl/tenant_test_utils.go @@ -314,6 +314,10 @@ func (c *httpClient) PostJSONChecked( return httputil.PostJSON(c.client, c.baseURL+path, request, response) } +func (c *httpClient) PostJSONRawChecked(path string, request []byte) (*http.Response, error) { + return httputil.PostJSONRaw(c.client, c.baseURL+path, request) +} + func (c *httpClient) Close() { c.client.CloseIdleConnections() } diff --git a/pkg/ccl/serverccl/testdata/api_v2_sql b/pkg/ccl/serverccl/testdata/api_v2_sql new file mode 100644 index 000000000000..ee1171781afa --- /dev/null +++ b/pkg/ccl/serverccl/testdata/api_v2_sql @@ -0,0 +1,500 @@ +subtest sql_select_no_execute + +sql admin +{ + "database": "system", + "statements": [{"sql": "SELECT username FROM users where username = $1", "arguments": ["admin"]}] +} +---- +{ + "num_statements": 1, + "request": { + "application_name": "$ api-v2-sql", + "database": "system", + "execute": false, + "max_result_size": 10000, + "statements": [ + { + "arguments": [ + "admin" + ], + "sql": "SELECT username FROM users WHERE username = $1" + } + ], + "timeout": "5s" + } +} + +subtest end + +subtest sql_select_users + +sql admin +{ + "database": "system", + "execute": true, + "statements": [{"sql": "SELECT username FROM users where username = $1", "arguments": ["admin"]}] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "username", + "oid": 25, + "type": "STRING" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "username": "admin" + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "SELECT" + } + ] + }, + "num_statements": 1 +} + +subtest end + +subtest regression_test_for_84385 + +# Regression test for #84385. +sql admin +{ + "database": "system", + "execute": true, + "statements": [{"sql": "SELECT 1, 2"}] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "?column?", + "oid": 20, + "type": "INT8" + }, + { + "name": "?column?", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "?column?": 2 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "SELECT" + } + ] + }, + "num_statements": 1 +} + +subtest end + +subtest select_user_no_admin + +sql non-admin expect-error +{ + "database": "system", + "execute": true, + "statements": [{"sql": "SELECT username FROM users where username = 'admin'"}] +} +---- +42501|executing stmt 1: run-query-via-api: user authentic_user_noadmin does not have SELECT privilege on relation users + +subtest end + +subtest sql_multiple_statements + +sql admin +{ + "database": "system", + "execute": true, + "statements": [ + {"sql": "SELECT username FROM users where username = 'admin'"}, + {"sql": "SELECT \"eventType\" FROM eventlog where \"eventType\" = 'node_restart'"} + ] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "username", + "oid": 25, + "type": "STRING" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "username": "admin" + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "SELECT" + }, + { + "columns": [ + { + "name": "eventType", + "oid": 25, + "type": "STRING" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 2, + "tag": "SELECT" + } + ] + }, + "num_statements": 2 +} + +subtest end + +subtest sql_schema_changes + +sql admin +{ + "database": "mydb", + "execute": true, + "statements": [ + {"sql": "CREATE database mydb"}, + {"sql": "CREATE table mydb.test (id int)"}, + {"sql": "INSERT INTO test VALUES (1)"} + ] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "CREATE DATABASE" + }, + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 2, + "tag": "CREATE TABLE" + }, + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 1, + "start": "1970-01-01T00:00:00Z", + "statement": 3, + "tag": "INSERT" + } + ] + }, + "num_statements": 3 +} + +subtest end + +subtest sql_syntax_error + +sql admin expect-error +{ + "statements": [ + {"sql": "INSERT INTO WHERE"} + ] +} +---- +42601|parsing statement 1: at or near "where": syntax error + +subtest end + +subtest invalid_duration + +sql admin expect-error +{ + "timeout": "abcdef", + "statements": [ + {"sql": "INSERT INTO WHERE"} + ] +} +---- +XXUUU|time: invalid duration "abcdef" + +subtest end + +subtest sql_multiple_statements_in_one_line + +sql admin expect-error +{ + "statements": [ + {"sql": "SELECT username FROM users where username = 'admin'; SELECT username FROM users where username = 'admin'"} + ] +} +---- +XXUUU|parsing statement 1: expecting 1 statement, found 2 + +subtest end + +subtest sql_placeholder_errors + +sql admin expect-error +{ + "statements": [ + {"sql": "SELECT username FROM users where username = $1"} + ] +} +---- +XXUUU|parsing statement 1: expected 1 placeholder(s), got 0 + + +sql admin expect-error +{ + "statements": [ + {"sql": "SELECT username FROM users where username = $1", "arguments": ["blah", "blah"]} + ] +} +---- +XXUUU|parsing statement 1: expected 1 placeholder(s), got 2 + +subtest end + +subtest sql_create_table + +sql admin +{ + "database": "mydb", + "execute": true, + "statements": [{"sql": "CREATE TABLE foo (i INT PRIMARY KEY, j INT UNIQUE)"}] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "CREATE TABLE" + } + ] + }, + "num_statements": 1 +} + +subtest end + +subtest sql_alter_table + +sql admin +{ + "database": "mydb", + "execute": true, + "statements": [ + {"sql": "ALTER TABLE foo RENAME TO bar"}, + {"sql": "INSERT INTO bar (i) VALUES (1), (2)"}, + {"sql": "ALTER TABLE bar DROP COLUMN j"}, + {"sql": "ALTER TABLE bar ADD COLUMN k INT DEFAULT 42"} + ] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "ALTER TABLE" + }, + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 2, + "start": "1970-01-01T00:00:00Z", + "statement": 2, + "tag": "INSERT" + }, + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 3, + "tag": "ALTER TABLE" + }, + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 4, + "tag": "ALTER TABLE" + } + ] + }, + "num_statements": 4 +} + +sql admin +{ + "database": "mydb", + "execute": true, + "statements": [ + {"sql": "SELECT * FROM bar"} + ] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "i", + "oid": 20, + "type": "INT8" + }, + { + "name": "k", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows": [ + { + "i": 1, + "k": 42 + }, + { + "i": 2, + "k": 42 + } + ], + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "SELECT" + } + ] + }, + "num_statements": 1 +} + +subtest end + +subtest sql_drop_table + +sql admin +{ + "database": "mydb", + "execute": true, + "statements": [ + {"sql": "DROP TABLE bar"} + ] +} +---- +{ + "execution": { + "txn_results": [ + { + "columns": [ + { + "name": "rows_affected", + "oid": 20, + "type": "INT8" + } + ], + "end": "1970-01-01T00:00:00Z", + "rows_affected": 0, + "start": "1970-01-01T00:00:00Z", + "statement": 1, + "tag": "DROP TABLE" + } + ] + }, + "num_statements": 1 +} + +subtest end diff --git a/pkg/server/api_v2.go b/pkg/server/api_v2.go index c035c95f46ad..3f19cbe715e8 100644 --- a/pkg/server/api_v2.go +++ b/pkg/server/api_v2.go @@ -43,6 +43,8 @@ import ( "net/http" "strconv" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -76,6 +78,15 @@ func getSQLUsername(ctx context.Context) username.SQLUsername { return username.MakeSQLUsernameFromPreNormalizedString(ctx.Value(webSessionUserKey{}).(string)) } +type apiV2ServerOpts struct { + admin *adminServer + status *statusServer + promRuleExporter *metric.PrometheusRuleExporter + tenantID roachpb.TenantID + sqlServer *SQLServer + db *kv.DB +} + // apiV2Server implements version 2 API endpoints, under apiV2Path. The // implementation of some endpoints is delegated to sub-servers (eg. auth // endpoints like `/login` and `/logout` are passed onto authServer), while @@ -89,21 +100,25 @@ type apiV2Server struct { status *statusServer promRuleExporter *metric.PrometheusRuleExporter mux *mux.Router + tenantID roachpb.TenantID + sqlServer *SQLServer + db *kv.DB } // newAPIV2Server returns a new apiV2Server. -func newAPIV2Server(ctx context.Context, s *Server) *apiV2Server { - authServer := newAuthenticationV2Server(ctx, s, apiV2Path) +func newAPIV2Server(ctx context.Context, opts *apiV2ServerOpts) *apiV2Server { + authServer := newAuthenticationV2Server(ctx, opts.sqlServer, opts.sqlServer.cfg.Config, apiV2Path) innerMux := mux.NewRouter() - authMux := newAuthenticationV2Mux(authServer, innerMux) outerMux := mux.NewRouter() a := &apiV2Server{ - admin: s.admin, + admin: opts.admin, authServer: authServer, - status: s.status, + status: opts.status, mux: outerMux, - promRuleExporter: s.promRuleExporter, + promRuleExporter: opts.promRuleExporter, + sqlServer: opts.sqlServer, + db: opts.db, } a.registerRoutes(innerMux, authMux) return a @@ -130,35 +145,36 @@ func (a *apiV2Server) registerRoutes(innerMux *mux.Router, authMux http.Handler) // `role`, or does not have the roleoption `option`, an HTTP 403 forbidden // error is returned. routeDefinitions := []struct { - url string - handler http.HandlerFunc - requiresAuth bool - role apiRole - option roleoption.Option + url string + handler http.HandlerFunc + requiresAuth bool + role apiRole + option roleoption.Option + tenantEnabled bool }{ // Pass through auth-related endpoints to the auth server. - {"login/", a.authServer.ServeHTTP, false /* requiresAuth */, regularRole, noOption}, - {"logout/", a.authServer.ServeHTTP, false /* requiresAuth */, regularRole, noOption}, + {"login/", a.authServer.ServeHTTP, false /* requiresAuth */, regularRole, noOption, false}, + {"logout/", a.authServer.ServeHTTP, false /* requiresAuth */, regularRole, noOption, false}, // Directly register other endpoints in the api server. - {"sessions/", a.listSessions, true /* requiresAuth */, adminRole, noOption}, - {"nodes/", a.listNodes, true, adminRole, noOption}, + {"sessions/", a.listSessions, true /* requiresAuth */, adminRole, noOption, false}, + {"nodes/", a.listNodes, true, adminRole, noOption, false}, // Any endpoint returning range information requires an admin user. This is because range start/end keys // are sensitive info. - {"nodes/{node_id}/ranges/", a.listNodeRanges, true, adminRole, noOption}, - {"ranges/hot/", a.listHotRanges, true, adminRole, noOption}, - {"ranges/{range_id:[0-9]+}/", a.listRange, true, adminRole, noOption}, - {"health/", a.health, false, regularRole, noOption}, - {"users/", a.listUsers, true, regularRole, noOption}, - {"events/", a.listEvents, true, adminRole, noOption}, - {"databases/", a.listDatabases, true, regularRole, noOption}, - {"databases/{database_name:[\\w.]+}/", a.databaseDetails, true, regularRole, noOption}, - {"databases/{database_name:[\\w.]+}/grants/", a.databaseGrants, true, regularRole, noOption}, - {"databases/{database_name:[\\w.]+}/tables/", a.databaseTables, true, regularRole, noOption}, - {"databases/{database_name:[\\w.]+}/tables/{table_name:[\\w.]+}/", a.tableDetails, true, regularRole, noOption}, - {"rules/", a.listRules, false, regularRole, noOption}, + {"nodes/{node_id}/ranges/", a.listNodeRanges, true, adminRole, noOption, false}, + {"ranges/hot/", a.listHotRanges, true, adminRole, noOption, false}, + {"ranges/{range_id:[0-9]+}/", a.listRange, true, adminRole, noOption, false}, + {"health/", a.health, false, regularRole, noOption, false}, + {"users/", a.listUsers, true, regularRole, noOption, false}, + {"events/", a.listEvents, true, adminRole, noOption, false}, + {"databases/", a.listDatabases, true, regularRole, noOption, false}, + {"databases/{database_name:[\\w.]+}/", a.databaseDetails, true, regularRole, noOption, false}, + {"databases/{database_name:[\\w.]+}/grants/", a.databaseGrants, true, regularRole, noOption, false}, + {"databases/{database_name:[\\w.]+}/tables/", a.databaseTables, true, regularRole, noOption, false}, + {"databases/{database_name:[\\w.]+}/tables/{table_name:[\\w.]+}/", a.tableDetails, true, regularRole, noOption, false}, + {"rules/", a.listRules, false, regularRole, noOption, false}, - {"sql/", a.execSQL, true, regularRole, noOption}, + {"sql/", a.execSQL, true, regularRole, noOption, true}, } // For all routes requiring authentication, have the outer mux (a.mux) @@ -170,11 +186,16 @@ func (a *apiV2Server) registerRoutes(innerMux *mux.Router, authMux http.Handler) counter: telemetry.GetCounter(fmt.Sprintf("api.v2.%s", route.url)), inner: route.handler, } + if !route.tenantEnabled && !a.sqlServer.execCfg.Codec.ForSystemTenant() { + a.mux.Handle(apiV2Path+route.url, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "Not Available on Tenants", http.StatusNotImplemented) + })) + } if route.requiresAuth { a.mux.Handle(apiV2Path+route.url, authMux) if route.role != regularRole { handler = &roleAuthorizationMux{ - ie: a.admin.ie, + ie: a.sqlServer.internalExecutor, role: route.role, option: route.option, inner: handler, diff --git a/pkg/server/api_v2_auth.go b/pkg/server/api_v2_auth.go index 2d5e52b715ed..001b2f74444a 100644 --- a/pkg/server/api_v2_auth.go +++ b/pkg/server/api_v2_auth.go @@ -15,6 +15,7 @@ import ( "encoding/base64" "net/http" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -44,13 +45,13 @@ type authenticationV2Server struct { // newAuthenticationV2Server creates a new authenticationV2Server for the given // outer Server, and base path. func newAuthenticationV2Server( - ctx context.Context, s *Server, basePath string, + ctx context.Context, s *SQLServer, cfg *base.Config, basePath string, ) *authenticationV2Server { simpleMux := http.NewServeMux() authServer := &authenticationV2Server{ - sqlServer: s.sqlServer, - authServer: newAuthenticationServer(s.cfg.Config, s.sqlServer), + sqlServer: s, + authServer: newAuthenticationServer(cfg, s), mux: simpleMux, ctx: ctx, basePath: basePath, diff --git a/pkg/server/api_v2_sql.go b/pkg/server/api_v2_sql.go index 93f69d9a5589..6929e66c66d5 100644 --- a/pkg/server/api_v2_sql.go +++ b/pkg/server/api_v2_sql.go @@ -34,7 +34,9 @@ import ( "github.com/cockroachdb/errors" ) -var sqlAPIClock timeutil.TimeSource = timeutil.DefaultTimeSource{} +// SQLAPIClock is exposed for override by tests. Tenant tests are in +// the serverccl package. +var SQLAPIClock timeutil.TimeSource = timeutil.DefaultTimeSource{} // swagger:operation POST /sql/ execSQL // @@ -262,7 +264,7 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { return } ctx := r.Context() - ctx = a.admin.server.AnnotateCtx(ctx) + ctx = a.sqlServer.ambientCtx.AnnotateCtx(ctx) // Read the request arguments. // Is there a request payload? @@ -372,9 +374,9 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { // We need a transaction to group the statements together. // We use TxnWithSteppingEnabled here even though we don't // use stepping below, because that buys us admission control. - ief := a.admin.server.sqlServer.execCfg.InternalExecutorFactory + ief := a.sqlServer.internalExecutorFactory runner = func(ctx context.Context, fn txnFunc) error { - return ief.TxnWithExecutor(ctx, a.admin.server.db, nil /* sessionData */, func( + return ief.TxnWithExecutor(ctx, a.db, nil /* sessionData */, func( ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, ) error { return fn(ctx, txn, ie) @@ -382,7 +384,7 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { } } else { runner = func(ctx context.Context, fn func(context.Context, *kv.Txn, sqlutil.InternalExecutor) error) error { - return fn(ctx, nil, a.admin.ie) + return fn(ctx, nil, a.sqlServer.internalExecutor) } } @@ -418,11 +420,11 @@ func (a *apiV2Server) execSQL(w http.ResponseWriter, r *http.Request) { returnType := stmt.stmt.AST.StatementReturnType() stmtErr := func() (retErr error) { - txnRes.Start = jsonTime(sqlAPIClock.Now()) + txnRes.Start = jsonTime(SQLAPIClock.Now()) txnRes.Statement = stmtIdx + 1 txnRes.Tag = stmt.stmt.AST.StatementTag() defer func() { - txnRes.End = jsonTime(sqlAPIClock.Now()) + txnRes.End = jsonTime(SQLAPIClock.Now()) if retErr != nil { retErr = errors.Wrapf(retErr, "executing stmt %d", stmtIdx+1) txnRes.Error = &jsonError{retErr} @@ -535,7 +537,7 @@ func (r *resultRow) MarshalJSON() ([]byte, error) { func (a *apiV2Server) shouldStop(ctx context.Context) error { select { - case <-a.admin.server.stopper.ShouldQuiesce(): + case <-a.sqlServer.stopper.ShouldQuiesce(): return errors.New("server is shutting down") case <-ctx.Done(): return ctx.Err() diff --git a/pkg/server/api_v2_sql_test.go b/pkg/server/api_v2_sql_test.go index 1fd4dcc851cc..2c4832100855 100644 --- a/pkg/server/api_v2_sql_test.go +++ b/pkg/server/api_v2_sql_test.go @@ -29,9 +29,9 @@ import ( func TestExecSQL(t *testing.T) { defer leaktest.AfterTest(t)() - sqlAPIClock = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) + SQLAPIClock = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) defer func() { - sqlAPIClock = timeutil.DefaultTimeSource{} + SQLAPIClock = timeutil.DefaultTimeSource{} }() server, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) diff --git a/pkg/server/server.go b/pkg/server/server.go index 3d0a3b47cba4..39f68545c92e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1653,13 +1653,20 @@ func (s *Server) PreStart(ctx context.Context) error { // endpoints served by gwMux by the HTTP cookie authentication // check. if err := s.http.setupRoutes(ctx, - s.authentication, /* authnServer */ - s.adminAuthzCheck, /* adminAuthzCheck */ - s.recorder, /* metricSource */ - s.runtime, /* runtimeStatsSampler */ - gwMux, /* handleRequestsUnauthenticated */ - s.debug, /* handleDebugUnauthenticated */ - newAPIV2Server(ctx, s), /* apiServer */ + s.authentication, /* authnServer */ + s.adminAuthzCheck, /* adminAuthzCheck */ + s.recorder, /* metricSource */ + s.runtime, /* runtimeStatsSampler */ + gwMux, /* handleRequestsUnauthenticated */ + s.debug, /* handleDebugUnauthenticated */ + newAPIV2Server(ctx, &apiV2ServerOpts{ + admin: s.admin, + status: s.status, + promRuleExporter: s.promRuleExporter, + tenantID: roachpb.SystemTenantID, + sqlServer: s.sqlServer, + db: s.db, + }), /* apiServer */ ); err != nil { return err } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index b89c387fc85b..749e7349bc35 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -570,9 +570,11 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error { s.runtime, /* runtimeStatsSampler */ gwMux, /* handleRequestsUnauthenticated */ s.debug, /* handleDebugUnauthenticated */ - // TODO(knz): the apiV2 server should be enabled for secondary tenants. - // See: https://github.com/cockroachdb/cockroach/issues/80789 - nil, /* apiServer */ + newAPIV2Server(workersCtx, &apiV2ServerOpts{ + sqlServer: s.sqlServer, + tenantID: s.sqlCfg.TenantID, + db: s.db, + }), /* apiServer */ ); err != nil { return err } diff --git a/pkg/util/httputil/http.go b/pkg/util/httputil/http.go index 2802fd995fbf..5b3edfd6b813 100644 --- a/pkg/util/httputil/http.go +++ b/pkg/util/httputil/http.go @@ -75,6 +75,17 @@ func PostJSON(httpClient http.Client, path string, request, response protoutil.M return err } +// PostJSONRaw uses the supplied client to POST request to the URL specified by +// the parameters and returns the response. +func PostJSONRaw(httpClient http.Client, path string, request []byte) (*http.Response, error) { + buf := bytes.NewBuffer(request) + req, err := http.NewRequest("POST", path, buf) + if err != nil { + return nil, err + } + return doJSONRawRequest(httpClient, req) +} + // PostJSONWithRequest uses the supplied client to POST request to the URL // specified by the parameters and unmarshals the result into response. // @@ -121,3 +132,11 @@ func doJSONRequest( } return resp, jsonpb.Unmarshal(resp.Body, response) } + +func doJSONRawRequest(httpClient http.Client, req *http.Request) (*http.Response, error) { + if timeout := httpClient.Timeout; timeout > 0 { + req.Header.Set("Grpc-Timeout", strconv.FormatInt(timeout.Nanoseconds(), 10)+"n") + } + req.Header.Set(AcceptHeader, JSONContentType) + return httpClient.Do(req) +} From 64a9b9d7334f0c53d5401c173a572577b1f1e9be Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 11 Nov 2022 13:15:37 +0000 Subject: [PATCH 2/5] kvserver: add raftLogState struct Unite the info about the log size and last entry into a struct. Release note: None --- pkg/kv/kvserver/replica_raft.go | 39 ++++++++++++---------- pkg/kv/kvserver/replica_raftstorage.go | 46 +++++++++++++++----------- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 239b7e37ac49..9aff644fdc55 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -702,9 +702,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked( var hasReady bool var rd raft.Ready r.mu.Lock() - lastIndex := r.mu.lastIndex // used for append below - lastTerm := r.mu.lastTerm - raftLogSize := r.mu.raftLogSize + state := raftLogState{ // used for append below + lastIndex: r.mu.lastIndex, + lastTerm: r.mu.lastTerm, + raftLogSize: r.mu.raftLogSize, + } leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { @@ -807,9 +809,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // applySnapshot, but we also want to make sure we reflect these changes in // the local variables we're tracking here. r.mu.RLock() - lastIndex = r.mu.lastIndex - lastTerm = r.mu.lastTerm - raftLogSize = r.mu.raftLogSize + state = raftLogState{ + lastIndex: r.mu.lastIndex, + lastTerm: r.mu.lastTerm, + raftLogSize: r.mu.raftLogSize, + } r.mu.RUnlock() // We refresh pending commands after applying a snapshot because this @@ -861,7 +865,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, getNonDeterministicFailureExplanation(err), err } if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication { - if err := appTask.AckCommittedEntriesBeforeApplication(ctx, lastIndex); err != nil { + if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.lastIndex); err != nil { return stats, getNonDeterministicFailureExplanation(err), err } } @@ -928,7 +932,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( batch := r.store.Engine().NewUnindexedBatch(false /* writeOnly */) defer batch.Close() - prevLastIndex := lastIndex + prevLastIndex := state.lastIndex if len(rd.Entries) > 0 { stats.tAppendBegin = timeutil.Now() // All of the entries are appended to distinct keys, returning a new @@ -938,9 +942,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "during sideloading" return stats, expl, errors.Wrap(err, expl) } - raftLogSize += sideLoadedEntriesSize - if lastIndex, lastTerm, raftLogSize, err = logAppend( - ctx, r.raftMu.stateLoader.RaftLogPrefix(), batch, lastIndex, lastTerm, raftLogSize, thinEntries, + state.raftLogSize += sideLoadedEntriesSize + if state, err = logAppend( + ctx, r.raftMu.stateLoader.RaftLogPrefix(), batch, state, thinEntries, ); err != nil { const expl = "during append" return stats, expl, errors.Wrap(err, expl) @@ -1010,19 +1014,20 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "while purging sideloaded storage" return stats, expl, err } - raftLogSize -= purgedSize - if raftLogSize < 0 { + state.raftLogSize -= purgedSize + if state.raftLogSize < 0 { // Might have gone negative if node was recently restarted. - raftLogSize = 0 + state.raftLogSize = 0 } } // Update protected state - last index, last term, raft log size, and raft // leader ID. r.mu.Lock() - r.mu.lastIndex = lastIndex - r.mu.lastTerm = lastTerm - r.mu.raftLogSize = raftLogSize + // TODO(pavelkalinnikov): put raftLogState to r.mu directly instead of fields. + r.mu.lastIndex = state.lastIndex + r.mu.lastTerm = state.lastTerm + r.mu.raftLogSize = state.raftLogSize var becameLeader bool if r.mu.leaderID != leaderID { r.mu.leaderID = leaderID diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 87185e0ec3f6..e642e7986cc1 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -619,10 +619,17 @@ func snapshot( }, nil } -// logAppend adds the given entries to the raft log. Takes the previous -// lastIndex, lastTerm, raftLogSize, and returns new values. It's the -// caller's responsibility to maintain exclusive access to the raft log -// for the duration of the method call. +// raftLogState stores information about the last entry and the size of the log. +type raftLogState struct { + lastIndex uint64 + lastTerm uint64 + raftLogSize int64 +} + +// logAppend adds the given entries to the raft log. Takes the previous log +// state, and returns the updated state. It's the caller's responsibility to +// maintain exclusive access to the raft log for the duration of the method +// call. // // logAppend is intentionally oblivious to the existence of sideloaded // proposals. They are managed by the caller, including cleaning up obsolete @@ -631,13 +638,11 @@ func logAppend( ctx context.Context, raftLogPrefix roachpb.Key, rw storage.ReadWriter, - prevLastIndex uint64, - prevLastTerm uint64, - prevRaftLogSize int64, + prev raftLogState, entries []raftpb.Entry, -) (uint64, uint64, int64, error) { +) (raftLogState, error) { if len(entries) == 0 { - return prevLastIndex, prevLastTerm, prevRaftLogSize, nil + return prev, nil } var diff enginepb.MVCCStats var value roachpb.Value @@ -646,37 +651,38 @@ func logAppend( key := keys.RaftLogKeyFromPrefix(raftLogPrefix, ent.Index) if err := value.SetProto(ent); err != nil { - return 0, 0, 0, err + return raftLogState{}, err } value.InitChecksum(key) var err error - if ent.Index > prevLastIndex { + if ent.Index > prev.lastIndex { err = storage.MVCCBlindPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) } else { err = storage.MVCCPut(ctx, rw, &diff, key, hlc.Timestamp{}, hlc.ClockTimestamp{}, value, nil /* txn */) } if err != nil { - return 0, 0, 0, err + return raftLogState{}, err } } - lastIndex := entries[len(entries)-1].Index - lastTerm := entries[len(entries)-1].Term + newLastIndex := entries[len(entries)-1].Index // Delete any previously appended log entries which never committed. - if prevLastIndex > 0 { - for i := lastIndex + 1; i <= prevLastIndex; i++ { + if prev.lastIndex > 0 { + for i := newLastIndex + 1; i <= prev.lastIndex; i++ { // Note that the caller is in charge of deleting any sideloaded payloads // (which they must only do *after* the batch has committed). _, err := storage.MVCCDelete(ctx, rw, &diff, keys.RaftLogKeyFromPrefix(raftLogPrefix, i), hlc.Timestamp{}, hlc.ClockTimestamp{}, nil) if err != nil { - return 0, 0, 0, err + return raftLogState{}, err } } } - - raftLogSize := prevRaftLogSize + diff.SysBytes - return lastIndex, lastTerm, raftLogSize, nil + return raftLogState{ + lastIndex: newLastIndex, + lastTerm: entries[len(entries)-1].Term, + raftLogSize: prev.raftLogSize + diff.SysBytes, + }, nil } // updateRangeInfo is called whenever a range is updated by ApplySnapshot From 110f23b092ef5038cfc1d25ed8aff1b9318eed2a Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 11 Nov 2022 16:43:43 +0000 Subject: [PATCH 3/5] kvserver: clarify the byte size field name Release note: None --- pkg/kv/kvserver/replica_raft.go | 22 +++++++++++----------- pkg/kv/kvserver/replica_raftstorage.go | 12 ++++++------ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 9aff644fdc55..ded7200ba404 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -703,9 +703,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( var rd raft.Ready r.mu.Lock() state := raftLogState{ // used for append below - lastIndex: r.mu.lastIndex, - lastTerm: r.mu.lastTerm, - raftLogSize: r.mu.raftLogSize, + lastIndex: r.mu.lastIndex, + lastTerm: r.mu.lastTerm, + byteSize: r.mu.raftLogSize, } leaderID := r.mu.leaderID lastLeaderID := leaderID @@ -810,9 +810,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // the local variables we're tracking here. r.mu.RLock() state = raftLogState{ - lastIndex: r.mu.lastIndex, - lastTerm: r.mu.lastTerm, - raftLogSize: r.mu.raftLogSize, + lastIndex: r.mu.lastIndex, + lastTerm: r.mu.lastTerm, + byteSize: r.mu.raftLogSize, } r.mu.RUnlock() @@ -942,7 +942,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "during sideloading" return stats, expl, errors.Wrap(err, expl) } - state.raftLogSize += sideLoadedEntriesSize + state.byteSize += sideLoadedEntriesSize if state, err = logAppend( ctx, r.raftMu.stateLoader.RaftLogPrefix(), batch, state, thinEntries, ); err != nil { @@ -1014,10 +1014,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "while purging sideloaded storage" return stats, expl, err } - state.raftLogSize -= purgedSize - if state.raftLogSize < 0 { + state.byteSize -= purgedSize + if state.byteSize < 0 { // Might have gone negative if node was recently restarted. - state.raftLogSize = 0 + state.byteSize = 0 } } @@ -1027,7 +1027,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // TODO(pavelkalinnikov): put raftLogState to r.mu directly instead of fields. r.mu.lastIndex = state.lastIndex r.mu.lastTerm = state.lastTerm - r.mu.raftLogSize = state.raftLogSize + r.mu.raftLogSize = state.byteSize var becameLeader bool if r.mu.leaderID != leaderID { r.mu.leaderID = leaderID diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index e642e7986cc1..6db8389bf6c8 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -621,9 +621,9 @@ func snapshot( // raftLogState stores information about the last entry and the size of the log. type raftLogState struct { - lastIndex uint64 - lastTerm uint64 - raftLogSize int64 + lastIndex uint64 + lastTerm uint64 + byteSize int64 } // logAppend adds the given entries to the raft log. Takes the previous log @@ -679,9 +679,9 @@ func logAppend( } } return raftLogState{ - lastIndex: newLastIndex, - lastTerm: entries[len(entries)-1].Term, - raftLogSize: prev.raftLogSize + diff.SysBytes, + lastIndex: newLastIndex, + lastTerm: entries[len(entries)-1].Term, + byteSize: prev.byteSize + diff.SysBytes, }, nil } From 8087255148e3c66fef79f01e90d0c4a97342dc35 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 11 Nov 2022 13:35:06 +0000 Subject: [PATCH 4/5] kvserver: factor out storing log entries in Replica Release note: None --- pkg/kv/kvserver/replica_raft.go | 158 ++++++++++++++++++-------------- 1 file changed, 88 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ded7200ba404..c3d54079279e 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -927,77 +927,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.traceMessageSends(msgApps, "sending msgApp") r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) - // Use a more efficient write-only batch because we don't need to do any - // reads from the batch. Any reads are performed on the underlying DB. - batch := r.store.Engine().NewUnindexedBatch(false /* writeOnly */) - defer batch.Close() - prevLastIndex := state.lastIndex - if len(rd.Entries) > 0 { - stats.tAppendBegin = timeutil.Now() - // All of the entries are appended to distinct keys, returning a new - // last index. - thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := maybeSideloadEntries(ctx, rd.Entries, r.raftMu.sideloaded) - if err != nil { - const expl = "during sideloading" - return stats, expl, errors.Wrap(err, expl) - } - state.byteSize += sideLoadedEntriesSize - if state, err = logAppend( - ctx, r.raftMu.stateLoader.RaftLogPrefix(), batch, state, thinEntries, - ); err != nil { - const expl = "during append" - return stats, expl, errors.Wrap(err, expl) - } - stats.appendedRegularCount += len(thinEntries) - numSideloaded - stats.appendedRegularBytes += otherEntriesSize - stats.appendedSideloadedCount += numSideloaded - stats.appendedSideloadedBytes += sideLoadedEntriesSize - stats.tAppendEnd = timeutil.Now() - } - - if !raft.IsEmptyHardState(rd.HardState) { - if !r.IsInitialized() && rd.HardState.Commit != 0 { - log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) - } - // NB: Note that without additional safeguards, it's incorrect to write - // the HardState before appending rd.Entries. When catching up, a follower - // will receive Entries that are immediately Committed in the same - // Ready. If we persist the HardState but happen to lose the Entries, - // assertions can be tripped. - // - // We have both in the same batch, so there's no problem. If that ever - // changes, we must write and sync the Entries before the HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { - const expl = "during setHardState" - return stats, expl, errors.Wrap(err, expl) - } - } - // Synchronously commit the batch with the Raft log entries and Raft hard - // state as we're promising not to lose this data. - // - // Note that the data is visible to other goroutines before it is synced to - // disk. This is fine. The important constraints are that these syncs happen - // before Raft messages are sent and before the call to RawNode.Advance. Our - // regular locking is sufficient for this and if other goroutines can see the - // data early, that's fine. In particular, snapshots are not a problem (I - // think they're the only thing that might access log entries or HardState - // from other goroutines). Snapshots do not include either the HardState or - // uncommitted log entries, and even if they did include log entries that - // were not persisted to disk, it wouldn't be a problem because raft does not - // infer the that entries are persisted on the node that sends a snapshot. - stats.tPebbleCommitBegin = timeutil.Now() - stats.pebbleBatchBytes = int64(batch.Len()) - sync := rd.MustSync && !disableSyncRaftLog.Get(&r.store.cfg.Settings.SV) - if err := batch.Commit(sync); err != nil { - const expl = "while committing batch" - return stats, expl, errors.Wrap(err, expl) - } - stats.sync = sync - stats.tPebbleCommitEnd = timeutil.Now() - if rd.MustSync { - r.store.metrics.RaftLogCommitLatency.RecordValue( - stats.tPebbleCommitEnd.Sub(stats.tPebbleCommitBegin).Nanoseconds()) + if state, err = r.storeLogEntries(ctx, state, rd, &stats); err != nil { + const expl = "while storing log entries" + return stats, expl, err } if len(rd.Entries) > 0 { @@ -1146,6 +1079,91 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, "", nil } +// storeLogEntries persists newly appended raft Entries to the log storage. +// Accepts the state of the log before the operation, returns the state after. +// Persists HardState atomically with, or strictly after Entries. +// +// TODO(pavelkalinnikov): raft.Ready combines multiple roles. For a stricter +// separation between log and state storage, introduce a log-specific Ready, +// consisting of committed Entries, HardState, and MustSync. +func (r *Replica) storeLogEntries( + ctx context.Context, state raftLogState, rd raft.Ready, stats *handleRaftReadyStats, +) (raftLogState, error) { + // TODO(pavelkalinnikov): Doesn't this comment contradict the code? + // Use a more efficient write-only batch because we don't need to do any + // reads from the batch. Any reads are performed on the underlying DB. + batch := r.store.Engine().NewUnindexedBatch(false /* writeOnly */) + defer batch.Close() + + if len(rd.Entries) > 0 { + stats.tAppendBegin = timeutil.Now() + // All of the entries are appended to distinct keys, returning a new + // last index. + thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := maybeSideloadEntries(ctx, rd.Entries, r.raftMu.sideloaded) + if err != nil { + const expl = "during sideloading" + return raftLogState{}, errors.Wrap(err, expl) + } + state.byteSize += sideLoadedEntriesSize + if state, err = logAppend( + ctx, r.raftMu.stateLoader.RaftLogPrefix(), batch, state, thinEntries, + ); err != nil { + const expl = "during append" + return raftLogState{}, errors.Wrap(err, expl) + } + stats.appendedRegularCount += len(thinEntries) - numSideloaded + stats.appendedRegularBytes += otherEntriesSize + stats.appendedSideloadedCount += numSideloaded + stats.appendedSideloadedBytes += sideLoadedEntriesSize + stats.tAppendEnd = timeutil.Now() + } + + if !raft.IsEmptyHardState(rd.HardState) { + if !r.IsInitialized() && rd.HardState.Commit != 0 { + log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) + } + // NB: Note that without additional safeguards, it's incorrect to write + // the HardState before appending rd.Entries. When catching up, a follower + // will receive Entries that are immediately Committed in the same + // Ready. If we persist the HardState but happen to lose the Entries, + // assertions can be tripped. + // + // We have both in the same batch, so there's no problem. If that ever + // changes, we must write and sync the Entries before the HardState. + if err := r.raftMu.stateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { + const expl = "during setHardState" + return raftLogState{}, errors.Wrap(err, expl) + } + } + // Synchronously commit the batch with the Raft log entries and Raft hard + // state as we're promising not to lose this data. + // + // Note that the data is visible to other goroutines before it is synced to + // disk. This is fine. The important constraints are that these syncs happen + // before Raft messages are sent and before the call to RawNode.Advance. Our + // regular locking is sufficient for this and if other goroutines can see the + // data early, that's fine. In particular, snapshots are not a problem (I + // think they're the only thing that might access log entries or HardState + // from other goroutines). Snapshots do not include either the HardState or + // uncommitted log entries, and even if they did include log entries that + // were not persisted to disk, it wouldn't be a problem because raft does not + // infer the that entries are persisted on the node that sends a snapshot. + stats.tPebbleCommitBegin = timeutil.Now() + stats.pebbleBatchBytes = int64(batch.Len()) + sync := rd.MustSync && !disableSyncRaftLog.Get(&r.store.cfg.Settings.SV) + if err := batch.Commit(sync); err != nil { + const expl = "while committing batch" + return raftLogState{}, errors.Wrap(err, expl) + } + stats.sync = sync + stats.tPebbleCommitEnd = timeutil.Now() + if rd.MustSync { + r.store.metrics.RaftLogCommitLatency.RecordValue( + stats.tPebbleCommitEnd.Sub(stats.tPebbleCommitBegin).Nanoseconds()) + } + return state, nil +} + // splitMsgApps splits the Raft message slice into two slices, one containing // MsgApps and one containing all other message types. Each slice retains the // relative ordering between messages in the original slice. From 55af13857c9d5b7fbb10d070d091e0898b1a26ec Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 11 Nov 2022 18:02:35 +0000 Subject: [PATCH 5/5] kvserver: introduce stub of the log storage There are a few interfaces and types essential for persisting appended log entries to storage. This commit consolidates them in a logStore struct which in the future is becoming the separated log storage. Release note: None --- pkg/kv/kvserver/replica_raft.go | 48 ++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c3d54079279e..2ff375e961f4 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -928,7 +929,22 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) prevLastIndex := state.lastIndex - if state, err = r.storeLogEntries(ctx, state, rd, &stats); err != nil { + + // TODO(pavelkalinnikov): find a way to move it to storeEntries. + if !raft.IsEmptyHardState(rd.HardState) { + if !r.IsInitialized() && rd.HardState.Commit != 0 { + log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) + } + } + // TODO(pavelkalinnikov): construct and store this in Replica. + s := logStore{ + engine: r.store.engine, + sideload: r.raftMu.sideloaded, + stateLoader: r.raftMu.stateLoader, + settings: r.store.cfg.Settings, + metrics: r.store.metrics, + } + if state, err = s.storeEntries(ctx, state, rd, &stats); err != nil { const expl = "while storing log entries" return stats, expl, err } @@ -1079,34 +1095,45 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, "", nil } -// storeLogEntries persists newly appended raft Entries to the log storage. +// logStore is a stub of a separated Raft log storage. +// +// TODO(pavelkalinnikov): move to its own package. +type logStore struct { + engine storage.Engine + sideload SideloadStorage + stateLoader stateloader.StateLoader + settings *cluster.Settings + metrics *StoreMetrics +} + +// storeEntries persists newly appended Raft log Entries to the log storage. // Accepts the state of the log before the operation, returns the state after. // Persists HardState atomically with, or strictly after Entries. // // TODO(pavelkalinnikov): raft.Ready combines multiple roles. For a stricter // separation between log and state storage, introduce a log-specific Ready, // consisting of committed Entries, HardState, and MustSync. -func (r *Replica) storeLogEntries( +func (s *logStore) storeEntries( ctx context.Context, state raftLogState, rd raft.Ready, stats *handleRaftReadyStats, ) (raftLogState, error) { // TODO(pavelkalinnikov): Doesn't this comment contradict the code? // Use a more efficient write-only batch because we don't need to do any // reads from the batch. Any reads are performed on the underlying DB. - batch := r.store.Engine().NewUnindexedBatch(false /* writeOnly */) + batch := s.engine.NewUnindexedBatch(false /* writeOnly */) defer batch.Close() if len(rd.Entries) > 0 { stats.tAppendBegin = timeutil.Now() // All of the entries are appended to distinct keys, returning a new // last index. - thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := maybeSideloadEntries(ctx, rd.Entries, r.raftMu.sideloaded) + thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := maybeSideloadEntries(ctx, rd.Entries, s.sideload) if err != nil { const expl = "during sideloading" return raftLogState{}, errors.Wrap(err, expl) } state.byteSize += sideLoadedEntriesSize if state, err = logAppend( - ctx, r.raftMu.stateLoader.RaftLogPrefix(), batch, state, thinEntries, + ctx, s.stateLoader.RaftLogPrefix(), batch, state, thinEntries, ); err != nil { const expl = "during append" return raftLogState{}, errors.Wrap(err, expl) @@ -1119,9 +1146,6 @@ func (r *Replica) storeLogEntries( } if !raft.IsEmptyHardState(rd.HardState) { - if !r.IsInitialized() && rd.HardState.Commit != 0 { - log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) - } // NB: Note that without additional safeguards, it's incorrect to write // the HardState before appending rd.Entries. When catching up, a follower // will receive Entries that are immediately Committed in the same @@ -1130,7 +1154,7 @@ func (r *Replica) storeLogEntries( // // We have both in the same batch, so there's no problem. If that ever // changes, we must write and sync the Entries before the HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { + if err := s.stateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { const expl = "during setHardState" return raftLogState{}, errors.Wrap(err, expl) } @@ -1150,7 +1174,7 @@ func (r *Replica) storeLogEntries( // infer the that entries are persisted on the node that sends a snapshot. stats.tPebbleCommitBegin = timeutil.Now() stats.pebbleBatchBytes = int64(batch.Len()) - sync := rd.MustSync && !disableSyncRaftLog.Get(&r.store.cfg.Settings.SV) + sync := rd.MustSync && !disableSyncRaftLog.Get(&s.settings.SV) if err := batch.Commit(sync); err != nil { const expl = "while committing batch" return raftLogState{}, errors.Wrap(err, expl) @@ -1158,7 +1182,7 @@ func (r *Replica) storeLogEntries( stats.sync = sync stats.tPebbleCommitEnd = timeutil.Now() if rd.MustSync { - r.store.metrics.RaftLogCommitLatency.RecordValue( + s.metrics.RaftLogCommitLatency.RecordValue( stats.tPebbleCommitEnd.Sub(stats.tPebbleCommitBegin).Nanoseconds()) } return state, nil