From 09c7f9f1569a432e719c077eebb119cc702fcd18 Mon Sep 17 00:00:00 2001 From: Alex Barganier Date: Fri, 18 Aug 2023 10:56:22 -0400 Subject: [PATCH 1/4] pkg/util/log: rename `log.Flush` to `log.FlushFiles` Currently, the `log.Flush` function's name is misleading. Since it was originally introduced, we've added additional types of logging sinks (e.g. fluent-server and http-server network sinks). Therefore, `log.Flush` didn't actually perform a full flush of all sinks active in the logger. We're going to introduce a way to do so, but first, we need to rename this function to be more descriptive. This patch does so. Release note: none --- pkg/ccl/auditloggingccl/audit_logging_test.go | 12 ++++++------ pkg/ccl/backupccl/utils_test.go | 2 +- pkg/ccl/changefeedccl/changefeed_test.go | 12 ++++++------ pkg/ccl/changefeedccl/helpers_test.go | 2 +- pkg/ccl/changefeedccl/nemeses_test.go | 2 +- pkg/ccl/telemetryccl/telemetry_logging_test.go | 4 ++-- pkg/ccl/testccl/authccl/auth_test.go | 4 ++-- pkg/ccl/testccl/sqlccl/tenant_gc_test.go | 2 +- pkg/cli/connect.go | 2 +- pkg/cli/debug_send_kv_batch_test.go | 2 +- pkg/cli/demo.go | 2 +- pkg/cli/start.go | 4 ++-- pkg/jobs/jobstest/logutils.go | 2 +- pkg/jobs/registry_external_test.go | 2 +- pkg/kv/kvserver/client_raft_test.go | 2 +- .../protectedts/ptstorage/storage_test.go | 4 ++-- pkg/kv/kvserver/replicate_queue_test.go | 2 +- pkg/security/certmgr/cert_manager_test.go | 2 +- pkg/security/certs_rotation_test.go | 2 +- pkg/server/server_test.go | 2 +- pkg/server/status.go | 6 +++--- pkg/server/status/runtime_stats_test.go | 2 +- pkg/server/structlogging/hot_ranges_log_test.go | 2 +- pkg/sql/admin_audit_log_test.go | 10 +++++----- pkg/sql/event_log_test.go | 4 ++-- pkg/sql/pgwire/auth_test.go | 4 ++-- .../captured_index_usage_stats_test.go | 2 +- pkg/sql/telemetry_logging_test.go | 16 ++++++++-------- .../upgrades/schema_changes_external_test.go | 2 +- pkg/util/hlc/hlc_test.go | 4 ++-- pkg/util/log/clog_test.go | 2 +- pkg/util/log/doc.go | 4 ++-- pkg/util/log/file_log_gc_test.go | 4 ++-- pkg/util/log/file_sync_buffer.go | 2 +- pkg/util/log/formats_test.go | 2 +- pkg/util/log/log_flush.go | 8 ++++---- pkg/util/log/logcrash/crash_reporting.go | 2 +- pkg/util/log/secondary_log_test.go | 4 ++-- pkg/util/log/test_log_scope.go | 4 ++-- 39 files changed, 76 insertions(+), 76 deletions(-) diff --git a/pkg/ccl/auditloggingccl/audit_logging_test.go b/pkg/ccl/auditloggingccl/audit_logging_test.go index 575157eb5d98..652eb5b5ba6e 100644 --- a/pkg/ccl/auditloggingccl/audit_logging_test.go +++ b/pkg/ccl/auditloggingccl/audit_logging_test.go @@ -62,7 +62,7 @@ func TestRoleBasedAuditEnterpriseGated(t *testing.T) { // Run a test query. rootRunner.Exec(t, `SHOW CLUSTER SETTING sql.log.user_audit`) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -87,7 +87,7 @@ func TestRoleBasedAuditEnterpriseGated(t *testing.T) { // Run a test query. rootRunner.Exec(t, `SHOW CLUSTER SETTING sql.log.user_audit`) - log.Flush() + log.FlushFiles() entries, err = log.FetchEntriesFromFiles( 0, @@ -214,7 +214,7 @@ func TestSingleRoleAuditLogging(t *testing.T) { rootRunner.Exec(t, fmt.Sprintf("REVOKE %s from testuser", td.role)) } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -317,7 +317,7 @@ func TestMultiRoleAuditLogging(t *testing.T) { testRunner.Exec(t, query) } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -412,7 +412,7 @@ func TestReducedAuditConfig(t *testing.T) { // for the user at that time. testRunner.Exec(t, testQuery) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -439,7 +439,7 @@ func TestReducedAuditConfig(t *testing.T) { // The user now has a corresponding audit setting. We use a new query here to differentiate. testRunner2.Exec(t, `GRANT SELECT ON TABLE u TO root`) - log.Flush() + log.FlushFiles() entries, err = log.FetchEntriesFromFiles( 0, diff --git a/pkg/ccl/backupccl/utils_test.go b/pkg/ccl/backupccl/utils_test.go index 55de9084f969..00327d259ded 100644 --- a/pkg/ccl/backupccl/utils_test.go +++ b/pkg/ccl/backupccl/utils_test.go @@ -537,7 +537,7 @@ func requireRecoveryEvent( expected eventpb.RecoveryEvent, ) { testutils.SucceedsSoon(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( startTime, math.MaxInt64, diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index bc5d4255735b..95ba8fb8a998 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1725,7 +1725,7 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) { cdcTest(t, testFn) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) if err != nil { @@ -2127,7 +2127,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) if err != nil { @@ -2323,7 +2323,7 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) { cdcTestWithSystem(t, testFn) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) if err != nil { @@ -2480,7 +2480,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { cdcTestWithSystem(t, testFn) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) if err != nil { @@ -2543,7 +2543,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { } cdcTestWithSystem(t, testFn) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) if err != nil { @@ -2574,7 +2574,7 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { } cdcTest(t, testFn) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) if err != nil { diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index fcaa3c173a92..cde527bcb9c7 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -1176,7 +1176,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`) func checkStructuredLogs(t *testing.T, eventType string, startTime int64) []string { var matchingEntries []string testutils.SucceedsSoon(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(startTime, math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData) if err != nil { diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index abf932980c43..146b719d0ae9 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -54,7 +54,7 @@ func TestChangefeedNemeses(t *testing.T) { // // TODO(knz): This seems incorrect, see issue #109417. cdcTest(t, testFn, feedTestNoTenants) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData) if err != nil { diff --git a/pkg/ccl/telemetryccl/telemetry_logging_test.go b/pkg/ccl/telemetryccl/telemetry_logging_test.go index 88832a817cd5..a77ab7579542 100644 --- a/pkg/ccl/telemetryccl/telemetry_logging_test.go +++ b/pkg/ccl/telemetryccl/telemetry_logging_test.go @@ -107,7 +107,7 @@ func TestTelemetryLogRegions(t *testing.T) { sqlDB.Exec(t, tc.query) } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -322,7 +322,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) { execTimestamp++ } - log.Flush() + log.FlushFiles() var filteredSampleQueries []logpb.Entry testutils.SucceedsSoon(t, func() error { diff --git a/pkg/ccl/testccl/authccl/auth_test.go b/pkg/ccl/testccl/authccl/auth_test.go index c146e3a0e783..49995f782bce 100644 --- a/pkg/ccl/testccl/authccl/auth_test.go +++ b/pkg/ccl/testccl/authccl/auth_test.go @@ -526,7 +526,7 @@ func TestClientAddrOverride(t *testing.T) { t.Run("check-server-log-uses-override", func(t *testing.T) { // Wait for the disconnection event in logs. testutils.SucceedsSoon(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, sessionTerminatedRe, log.WithMarkedSensitiveData) if err != nil { @@ -539,7 +539,7 @@ func TestClientAddrOverride(t *testing.T) { }) // Now we want to check that the logging tags are also updated. - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, authLogFileRe, log.WithMarkedSensitiveData) if err != nil { diff --git a/pkg/ccl/testccl/sqlccl/tenant_gc_test.go b/pkg/ccl/testccl/sqlccl/tenant_gc_test.go index 19d09cea29d8..97b1161fc2f3 100644 --- a/pkg/ccl/testccl/sqlccl/tenant_gc_test.go +++ b/pkg/ccl/testccl/sqlccl/tenant_gc_test.go @@ -184,7 +184,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) { checkGCBlockedByPTS := func(t *testing.T, sj *jobs.StartableJob, tenID uint64) { testutils.SucceedsSoon(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile(fmt.Sprintf("GC TTL for dropped tenant %d has expired, but protected timestamp record\\(s\\)", tenID)), log.WithFlattenedSensitiveData) diff --git a/pkg/cli/connect.go b/pkg/cli/connect.go index b60c5b91296c..e597046d81ed 100644 --- a/pkg/cli/connect.go +++ b/pkg/cli/connect.go @@ -76,7 +76,7 @@ func runConnectInit(cmd *cobra.Command, args []string) (retErr error) { } // Ensure that log files are populated when the process terminates. - defer log.Flush() + defer log.FlushFiles() peers := []string(serverCfg.JoinList) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/cli/debug_send_kv_batch_test.go b/pkg/cli/debug_send_kv_batch_test.go index 9dca99252e9b..bf5cf588593a 100644 --- a/pkg/cli/debug_send_kv_batch_test.go +++ b/pkg/cli/debug_send_kv_batch_test.go @@ -123,7 +123,7 @@ func TestSendKVBatch(t *testing.T) { require.JSONEq(t, jsonResponse, output) // Check that a structured log event was emitted. - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(start.UnixNano(), timeutil.Now().UnixNano(), 1, regexp.MustCompile("debug_send_kv_batch"), log.WithFlattenedSensitiveData) require.NoError(t, err) diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index e4f023eaf814..eb9e65210135 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -377,7 +377,7 @@ func runDemoInternal( } // Ensure the last few entries in the log files are flushed at the end. - defer log.Flush() + defer log.FlushFiles() return sqlCtx.Run(ctx, conn) } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index 86314926e580..b742bb4c8387 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -729,7 +729,7 @@ func createAndStartServerAsync( go func() { // Ensure that the log files see the startup messages immediately. - defer log.Flush() + defer log.FlushFiles() // If anything goes dramatically wrong, use Go's panic/recover // mechanism to intercept the panic and log the panic details to // the error reporting server. @@ -1497,7 +1497,7 @@ func reportReadinessExternally(ctx context.Context, cmd *cobra.Command, waitForI // Ensure the configuration logging is written to disk in case a // process is waiting for the sdnotify readiness to read important // information from there. - log.Flush() + log.FlushFiles() // Signal readiness. This unblocks the process when running with // --background or under systemd. diff --git a/pkg/jobs/jobstest/logutils.go b/pkg/jobs/jobstest/logutils.go index 7299d0b7e4c4..e476e514d3c7 100644 --- a/pkg/jobs/jobstest/logutils.go +++ b/pkg/jobs/jobstest/logutils.go @@ -36,7 +36,7 @@ func CheckEmittedEvents( ) { // Check that the structured event was logged. testutils.SucceedsSoon(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(startTime, math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData) if err != nil { diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 5b0c441f4bd3..175f71f3279f 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -554,7 +554,7 @@ SELECT unnest(execution_errors) t *testing.T, id jobspb.JobID, status jobs.Status, from, to time.Time, cause string, ) { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( from.UnixNano(), to.UnixNano(), 2, regexp.MustCompile(fmt.Sprintf( diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index e2da24725b02..61b75d8dc6d3 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1569,7 +1569,7 @@ func TestReceiveSnapshotLogging(t *testing.T) { // When ready, flush logs and check messages from store_raft.go since // call to repl.ChangeReplicas(..). <-signals.receiverDoneCh - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(), math.MaxInt64, 100, regexp.MustCompile(`store_raft\.go`), log.WithMarkedSensitiveData) require.NoError(t, err) diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index 8b4a444a35aa..c2e2700ea1a7 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -652,7 +652,7 @@ func TestCorruptData(t *testing.T) { require.NoError(t, err) } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg, log.WithFlattenedSensitiveData) require.NoError(t, err) @@ -739,7 +739,7 @@ func TestCorruptData(t *testing.T) { require.Nil(t, got) _, err = pts.GetState(ctx) require.NoError(t, err) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg, log.WithFlattenedSensitiveData) diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 23d97045c444..1772bf8af9bf 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -885,7 +885,7 @@ func TestReplicateQueueTracingOnError(t *testing.T) { // Flush logs and get log messages from replicate_queue.go since just // before calling store.Enqueue(..). - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(), math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData) require.NoError(t, err) diff --git a/pkg/security/certmgr/cert_manager_test.go b/pkg/security/certmgr/cert_manager_test.go index 623f2cda66b1..2d5982c9b8de 100644 --- a/pkg/security/certmgr/cert_manager_test.go +++ b/pkg/security/certmgr/cert_manager_test.go @@ -57,7 +57,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`) // Check that the structured event was logged. func checkLogStructEntry(t *testing.T, expectSuccess bool, beforeReload time.Time) error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(), math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData) if err != nil { diff --git a/pkg/security/certs_rotation_test.go b/pkg/security/certs_rotation_test.go index a9b939792033..cb839428323c 100644 --- a/pkg/security/certs_rotation_test.go +++ b/pkg/security/certs_rotation_test.go @@ -199,7 +199,7 @@ func TestRotateCerts(t *testing.T) { // the moment the structured logging event is actually // written to the log file. testutils.SucceedsSoon(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(), math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData) if err != nil { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 865316383629..34b70e468407 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -542,7 +542,7 @@ func TestPersistHLCUpperBound(t *testing.T) { var fatal bool defer log.ResetExitFunc() log.SetExitFunc(true /* hideStack */, func(r exit.Code) { - defer log.Flush() + defer log.FlushFiles() if r == exit.FatalError() { fatal = true } diff --git a/pkg/server/status.go b/pkg/server/status.go index 058170d3aa8a..448890c4bd73 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1237,7 +1237,7 @@ func (s *statusServer) LogFilesList( } return status.LogFilesList(ctx, req) } - log.Flush() + log.FlushFiles() logFiles, err := log.ListLogFiles() if err != nil { return nil, srverrors.ServerError(ctx, err) @@ -1277,7 +1277,7 @@ func (s *statusServer) LogFile( inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable) // Ensure that the latest log entries are available in files. - log.Flush() + log.FlushFiles() // Read the logs. reader, err := log.GetLogReader(req.File) @@ -1407,7 +1407,7 @@ func (s *statusServer) Logs( } // Ensure that the latest log entries are available in files. - log.Flush() + log.FlushFiles() // Read the logs. entries, err := log.FetchEntriesFromFiles( diff --git a/pkg/server/status/runtime_stats_test.go b/pkg/server/status/runtime_stats_test.go index 972ca8ddb3b4..1fb97947ee44 100644 --- a/pkg/server/status/runtime_stats_test.go +++ b/pkg/server/status/runtime_stats_test.go @@ -44,7 +44,7 @@ func TestStructuredEventLogging(t *testing.T) { time.Sleep(10 * time.Second) // Ensure that the entry hits the OS so it can be read back below. - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(), math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData) diff --git a/pkg/server/structlogging/hot_ranges_log_test.go b/pkg/server/structlogging/hot_ranges_log_test.go index 3f863027d4d4..0e34bdd8eb7d 100644 --- a/pkg/server/structlogging/hot_ranges_log_test.go +++ b/pkg/server/structlogging/hot_ranges_log_test.go @@ -88,7 +88,7 @@ func TestHotRangesStats(t *testing.T) { }) testutils.SucceedsWithin(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, math.MaxInt64, diff --git a/pkg/sql/admin_audit_log_test.go b/pkg/sql/admin_audit_log_test.go index 699401768371..d27460c5ff05 100644 --- a/pkg/sql/admin_audit_log_test.go +++ b/pkg/sql/admin_audit_log_test.go @@ -71,7 +71,7 @@ func TestAdminAuditLogBasic(t *testing.T) { db.Exec(t, `SELECT 1;`) var selectAdminRe = regexp.MustCompile(`"EventType":"admin_query","Statement":"SELECT ‹1›","Tag":"SELECT","User":"root"`) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 10000, selectAdminRe, log.WithMarkedSensitiveData) @@ -112,7 +112,7 @@ func TestAdminAuditLogRegularUser(t *testing.T) { var selectRe = regexp.MustCompile(`SELECT 1`) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 10000, selectRe, log.WithMarkedSensitiveData) @@ -168,7 +168,7 @@ COMMIT; }, } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -253,7 +253,7 @@ COMMIT; }, } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -297,7 +297,7 @@ COMMIT; t.Fatal(err) } - log.Flush() + log.FlushFiles() entries, err = log.FetchEntriesFromFiles( 0, diff --git a/pkg/sql/event_log_test.go b/pkg/sql/event_log_test.go index ca237786077a..700731e85d91 100644 --- a/pkg/sql/event_log_test.go +++ b/pkg/sql/event_log_test.go @@ -82,7 +82,7 @@ func TestStructuredEventLogging(t *testing.T) { } // Ensure that the entries hit the OS so they can be read back below. - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(), math.MaxInt64, 10000, execLogRe, log.WithMarkedSensitiveData) @@ -736,7 +736,7 @@ func TestPerfLogging(t *testing.T) { } var logRe = regexp.MustCompile(tc.logRe) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( start, math.MaxInt64, 1000, logRe, log.WithMarkedSensitiveData, ) diff --git a/pkg/sql/pgwire/auth_test.go b/pkg/sql/pgwire/auth_test.go index 3354ae433107..76ac49106fd0 100644 --- a/pkg/sql/pgwire/auth_test.go +++ b/pkg/sql/pgwire/auth_test.go @@ -746,7 +746,7 @@ func TestClientAddrOverride(t *testing.T) { t.Run("check-server-log-uses-override", func(t *testing.T) { // Wait for the disconnection event in logs. testutils.SucceedsSoon(t, func() error { - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, sessionTerminatedRe, log.WithMarkedSensitiveData) if err != nil { @@ -759,7 +759,7 @@ func TestClientAddrOverride(t *testing.T) { }) // Now we want to check that the logging tags are also updated. - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, authLogFileRe, log.WithMarkedSensitiveData) if err != nil { diff --git a/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go index 5a6f0febc34d..4d03816a54d6 100644 --- a/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go +++ b/pkg/sql/scheduledlogging/captured_index_usage_stats_test.go @@ -282,7 +282,7 @@ func checkNumTotalEntriesAndNumIndexEntries( expectedIndividualIndexEntries int, scheduleCompleteChan chan struct{}, ) error { - log.Flush() + log.FlushFiles() // Fetch log entries. entries, err := log.FetchEntriesFromFiles( 0, diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index 0401b2f5a2f2..061b8d4e4ff6 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -436,7 +436,7 @@ func TestTelemetryLogging(t *testing.T) { } } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -750,7 +750,7 @@ func TestTelemetryLoggingInternalEnabled(t *testing.T) { `TRUNCATE TABLE system.public.transaction_statistics`, } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -861,7 +861,7 @@ func TestTelemetryLoggingInternalConsoleEnabled(t *testing.T) { db.Exec(t, `SET application_name = $1`, tc.appName) db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.internal_console.enabled = $1;`, tc.logInternalConsole) db.Exec(t, query) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -964,7 +964,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { db.Exec(t, tc.query) } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -1168,7 +1168,7 @@ func TestTelemetryLogJoinTypesAndAlgorithms(t *testing.T) { db.Exec(t, tc.query) } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -1423,7 +1423,7 @@ func TestTelemetryScanCounts(t *testing.T) { db.Exec(t, tc.query) } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -1537,7 +1537,7 @@ $$` db.Exec(t, stmt) - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, @@ -1617,7 +1617,7 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { `BEGIN`, `SELECT ‹1›`, `SELECT ‹2›`, `SELECT ‹3›`, `COMMIT`, } - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, diff --git a/pkg/upgrade/upgrades/schema_changes_external_test.go b/pkg/upgrade/upgrades/schema_changes_external_test.go index a7bd9894c05e..ad799e8263e6 100644 --- a/pkg/upgrade/upgrades/schema_changes_external_test.go +++ b/pkg/upgrade/upgrades/schema_changes_external_test.go @@ -510,7 +510,7 @@ func testMigrationWithFailures( }) if test.waitForMigrationRestart { // Ensure that we have observed the expected number of ignored schema change jobs. - log.Flush() + log.FlushFiles() entries, err := log.FetchEntriesFromFiles( 0, math.MaxInt64, 10000, regexp.MustCompile("skipping.*operation as the schema change already exists."), diff --git a/pkg/util/hlc/hlc_test.go b/pkg/util/hlc/hlc_test.go index 50cd0e1c0298..0ae3f55427e1 100644 --- a/pkg/util/hlc/hlc_test.go +++ b/pkg/util/hlc/hlc_test.go @@ -438,7 +438,7 @@ func TestHLCEnforceWallTimeWithinBoundsInNow(t *testing.T) { var fatal bool defer log.ResetExitFunc() log.SetExitFunc(true /* hideStack */, func(r exit.Code) { - defer log.Flush() + defer log.FlushFiles() if r == exit.FatalError() { fatal = true } @@ -487,7 +487,7 @@ func TestHLCEnforceWallTimeWithinBoundsInUpdate(t *testing.T) { var fatal bool defer log.ResetExitFunc() log.SetExitFunc(true /* hideStack */, func(r exit.Code) { - defer log.Flush() + defer log.FlushFiles() if r == exit.FatalError() { fatal = true } diff --git a/pkg/util/log/clog_test.go b/pkg/util/log/clog_test.go index a8b7645db014..f6af4b89bf3f 100644 --- a/pkg/util/log/clog_test.go +++ b/pkg/util/log/clog_test.go @@ -651,7 +651,7 @@ func TestFileSeverityFilter(t *testing.T) { Infof(context.Background(), "test1") Errorf(context.Background(), "test2") - Flush() + FlushFiles() debugFileSink := debugFileSinkInfo.sink.(*fileSink) contents, err := os.ReadFile(debugFileSink.getFileName(t)) diff --git a/pkg/util/log/doc.go b/pkg/util/log/doc.go index fcea1c3e16db..dcaf9320b669 100644 --- a/pkg/util/log/doc.go +++ b/pkg/util/log/doc.go @@ -83,8 +83,8 @@ // // # Output // -// Log output is buffered and written periodically using Flush. Programs -// should call Flush before exiting to guarantee all log output is written. +// Log output is buffered and written periodically using FlushFiles. Programs +// should call FlushFiles before exiting to guarantee all log output is written. // // By default, all log statements write to files in a temporary directory. // This package provides several flags that modify this behavior. diff --git a/pkg/util/log/file_log_gc_test.go b/pkg/util/log/file_log_gc_test.go index 813d774a95cc..533af6f95044 100644 --- a/pkg/util/log/file_log_gc_test.go +++ b/pkg/util/log/file_log_gc_test.go @@ -158,7 +158,7 @@ func testLogGC(t *testing.T, fileSink *fileSink, logFn func(ctx context.Context, const newLogFiles = 20 for i := 1; i < newLogFiles; i++ { logFn(context.Background(), fmt.Sprint(i)) - Flush() + FlushFiles() } if _, err := expectFileCount(newLogFiles); err != nil { t.Fatal(err) @@ -169,7 +169,7 @@ func testLogGC(t *testing.T, fileSink *fileSink, logFn func(ctx context.Context, // Emit a log line which will rotate the files and trigger GC. logFn(context.Background(), "final") - Flush() + FlushFiles() succeedsSoon(t, func() error { _, err := expectFileCount(expectedFilesAfterGC) diff --git a/pkg/util/log/file_sync_buffer.go b/pkg/util/log/file_sync_buffer.go index 2fb5ce82efe8..5e0f3c8b2410 100644 --- a/pkg/util/log/file_sync_buffer.go +++ b/pkg/util/log/file_sync_buffer.go @@ -35,7 +35,7 @@ type syncBuffer struct { // Sync implements the flushSyncWriter interface. // -// Note: the other methods from flushSyncWriter (Flush, io.Writer) is +// Note: the other methods from flushSyncWriter (FlushFiles, io.Writer) is // implemented by the embedded *bufio.Writer directly. func (sb *syncBuffer) Sync() error { return sb.file.Sync() diff --git a/pkg/util/log/formats_test.go b/pkg/util/log/formats_test.go index 9a19413bed17..64ca18d76b65 100644 --- a/pkg/util/log/formats_test.go +++ b/pkg/util/log/formats_test.go @@ -71,7 +71,7 @@ func TestFormatRedaction(t *testing.T) { defer cleanupFn() Infof(ctx, "safe2 %s", "secret3") - Flush() + FlushFiles() contents, err := os.ReadFile(getDebugLogFileName(t)) require.NoError(t, err) diff --git a/pkg/util/log/log_flush.go b/pkg/util/log/log_flush.go index fa9431fa1604..a041fa427600 100644 --- a/pkg/util/log/log_flush.go +++ b/pkg/util/log/log_flush.go @@ -26,11 +26,11 @@ type flushSyncWriter interface { io.Writer } -// Flush explicitly flushes all pending log file I/O. +// FlushFiles explicitly flushes all pending log file I/O. // See also flushDaemon() that manages background (asynchronous) // flushes, and signalFlusher() that manages flushes in reaction to a // user signal. -func Flush() { +func FlushFiles() { _ = logging.allSinkInfos.iterFileSinks(func(l *fileSink) error { l.lockAndFlushAndMaybeSync(true /*doSync*/) return nil @@ -97,7 +97,7 @@ func signalFlusher() { ch := sysutil.RefreshSignaledChan() for sig := range ch { Ops.Infof(context.Background(), "%s received, flushing logs", sig) - Flush() + FlushFiles() } } @@ -109,5 +109,5 @@ func signalFlusher() { func StartAlwaysFlush() { logging.flushWrites.Set(true) // There may be something in the buffers already; flush it. - Flush() + FlushFiles() } diff --git a/pkg/util/log/logcrash/crash_reporting.go b/pkg/util/log/logcrash/crash_reporting.go index 18205dad11ed..6a1309099dcf 100644 --- a/pkg/util/log/logcrash/crash_reporting.go +++ b/pkg/util/log/logcrash/crash_reporting.go @@ -195,7 +195,7 @@ func ReportPanic(ctx context.Context, sv *settings.Values, r interface{}, depth // Ensure that the logs are flushed before letting a panic // terminate the server. - log.Flush() + log.FlushFiles() } // PanicAsError turns r into an error if it is not one already. diff --git a/pkg/util/log/secondary_log_test.go b/pkg/util/log/secondary_log_test.go index f691712bd851..abb94c228e74 100644 --- a/pkg/util/log/secondary_log_test.go +++ b/pkg/util/log/secondary_log_test.go @@ -71,7 +71,7 @@ func TestSecondaryLog(t *testing.T) { Infof(context.Background(), "test2") // Make sure the content made it to disk. - Flush() + FlushFiles() // Check that the messages indeed made it to different files. @@ -151,7 +151,7 @@ func TestListLogFilesIncludeSecondaryLogs(t *testing.T) { // Emit some logging and ensure the files gets created. ctx := context.Background() Sessions.Infof(ctx, "story time") - Flush() + FlushFiles() results, err := ListLogFiles() if err != nil { diff --git a/pkg/util/log/test_log_scope.go b/pkg/util/log/test_log_scope.go index f382f175e6c3..c68deb89e53e 100644 --- a/pkg/util/log/test_log_scope.go +++ b/pkg/util/log/test_log_scope.go @@ -380,7 +380,7 @@ func (l *TestLogScope) Rotate(t tShim) { t.Helper() t.Logf("-- test log scope file rotation --") // Ensure remaining logs are written. - Flush() + FlushFiles() if err := logging.allSinkInfos.iterFileSinks(func(l *fileSink) error { l.mu.Lock() @@ -402,7 +402,7 @@ func (l *TestLogScope) Close(t tShim) { t.Logf("-- test log scope end --") // Ensure any remaining logs are written to files. - Flush() + FlushFiles() if l.logDir != "" { defer func() { From f113fb41b80215dec3bd77ab11f67011ad1811fe Mon Sep 17 00:00:00 2001 From: Alex Barganier Date: Fri, 18 Aug 2023 11:27:46 -0400 Subject: [PATCH 2/4] pkg/util/log: introduce `log.FlushAllSync()`, call on panic handle Previously, our crash reporter system would flush file log sinks as part of the process to handle a panic. This was an incomplete process, since buffered network sinks were not included in part of this flush process. This means that many times, panic logs would not make it to the network target, leading to a loss in observability. This patch introduces `log.FlushAllSync()`, which flushes both file and buffered network log sinks. It then updates the crash reporter to call into this, instead of just flushing file log sinks. `FlushAllSync()` contains timeout logic to prevent the process from completing if one of the underlying child sinks that a bufferedSink wraps becomes unavailable/hangs on its `output()` call. Release note: none --- pkg/cli/start.go | 4 +- .../schematelemetry/schema_telemetry_test.go | 2 +- pkg/util/log/doc.go | 6 ++- pkg/util/log/log_flush.go | 42 +++++++++++++++++++ pkg/util/log/logcrash/crash_reporting.go | 2 +- pkg/util/log/registry.go | 13 ++++++ 6 files changed, 63 insertions(+), 6 deletions(-) diff --git a/pkg/cli/start.go b/pkg/cli/start.go index b742bb4c8387..3176c7adac32 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -729,7 +729,7 @@ func createAndStartServerAsync( go func() { // Ensure that the log files see the startup messages immediately. - defer log.FlushFiles() + defer log.FlushAllSync() // If anything goes dramatically wrong, use Go's panic/recover // mechanism to intercept the panic and log the panic details to // the error reporting server. @@ -1497,7 +1497,7 @@ func reportReadinessExternally(ctx context.Context, cmd *cobra.Command, waitForI // Ensure the configuration logging is written to disk in case a // process is waiting for the sdnotify readiness to read important // information from there. - log.FlushFiles() + log.FlushAllSync() // Signal readiness. This unblocks the process when running with // --background or under systemd. diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go index c90ef8225dcc..bfdd7febe05b 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go @@ -178,7 +178,7 @@ UPDATE system.namespace SET id = %d WHERE id = %d; // Ensure that our logs are flushed to disk before asserting about log // entries. - log.Flush() + log.FlushFiles() // Ensure that a log line is emitted for each invalid object, with a loose // enforcement of the log structure. diff --git a/pkg/util/log/doc.go b/pkg/util/log/doc.go index dcaf9320b669..8f3b92c8f2d5 100644 --- a/pkg/util/log/doc.go +++ b/pkg/util/log/doc.go @@ -83,8 +83,10 @@ // // # Output // -// Log output is buffered and written periodically using FlushFiles. Programs -// should call FlushFiles before exiting to guarantee all log output is written. +// Log output is buffered and written periodically using FlushFiles. +// Programs should call FlushFiles before exiting to guarantee all +// log output is written to files. Note that buffered network sinks also +// exist. If you'd like to flush these as well, call FlushAllSync. // // By default, all log statements write to files in a temporary directory. // This package provides several flags that modify this behavior. diff --git a/pkg/util/log/log_flush.go b/pkg/util/log/log_flush.go index a041fa427600..5bf42fc18714 100644 --- a/pkg/util/log/log_flush.go +++ b/pkg/util/log/log_flush.go @@ -12,6 +12,7 @@ package log import ( "context" + "fmt" "io" "time" @@ -37,6 +38,47 @@ func FlushFiles() { }) } +// FlushAllSync explicitly flushes all asynchronous buffered logging sinks, +// including pending log file I/O and buffered network sinks. +// +// NB: This is a synchronous operation, and will block until all flushes +// have completed. Generally only recommended for use in crash reporting +// and shutdown scenarios. Note that `tryForceSync` is best effort, so the +// possibility exists that a buffered log sink is unable to block until +// the flush completes. In such a case though, the expectation that a flush +// is already imminent for that sink. +// +// Each sink we attempt to flush is attempted with a timeout. +func FlushAllSync() { + FlushFiles() + _ = logging.allSinkInfos.iterBufferedSinks(func(bs *bufferedSink) error { + // Trigger a synchronous flush by calling output on the bufferedSink + // with a `tryForceSync` option. + doneCh := make(chan struct{}) + go func() { + err := bs.output([]byte{}, sinkOutputOptions{tryForceSync: true}) + if err != nil { + fmt.Printf("Error draining buffered log sink %T: %v\n", bs.child, err) + } + doneCh <- struct{}{} + }() + // Don't wait forever if the underlying sink happens to be unavailable. + // Set a timeout to avoid holding up the panic handle process for too long. + select { + case <-time.After(3 * time.Second): + fmt.Printf("Timed out waiting on buffered log sink %T to drain.\n", bs.child) + case <-doneCh: + } + // We don't want to let errors stop us from iterating and flushing + // the remaining buffered log sinks. Nor do we want to log the error + // using the logging system, as it's unlikely to make it to the + // destination sink anyway (there's a good chance we're flushing + // as part of handling a panic). If an error occurs, it will be displayed. + // Regardless, we return nil so the iteration continues. + return nil + }) +} + func init() { go flushDaemon() go signalFlusher() diff --git a/pkg/util/log/logcrash/crash_reporting.go b/pkg/util/log/logcrash/crash_reporting.go index 6a1309099dcf..b995fdcf07aa 100644 --- a/pkg/util/log/logcrash/crash_reporting.go +++ b/pkg/util/log/logcrash/crash_reporting.go @@ -195,7 +195,7 @@ func ReportPanic(ctx context.Context, sv *settings.Values, r interface{}, depth // Ensure that the logs are flushed before letting a panic // terminate the server. - log.FlushFiles() + log.FlushAllSync() } // PanicAsError turns r into an error if it is not one already. diff --git a/pkg/util/log/registry.go b/pkg/util/log/registry.go index aae3e84ba8b2..dce3e20bac17 100644 --- a/pkg/util/log/registry.go +++ b/pkg/util/log/registry.go @@ -89,6 +89,19 @@ func (r *sinkInfoRegistry) iterFileSinks(fn func(l *fileSink) error) error { }) } +// iterBufferedSinks iterates over all the buffered sinks and stops at the first +// error encountered. +func (r *sinkInfoRegistry) iterBufferedSinks(fn func(bs *bufferedSink) error) error { + return r.iter(func(si *sinkInfo) error { + if bs, ok := si.sink.(*bufferedSink); ok { + if err := fn(bs); err != nil { + return err + } + } + return nil + }) +} + // put adds a sinkInfo into the registry. func (r *sinkInfoRegistry) put(l *sinkInfo) { r.mu.Lock() From 297a1d30e80da74f040d30909c778117938a6d98 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 28 Aug 2023 08:46:55 +0000 Subject: [PATCH 3/4] rpc: increase gRPC server timeout from 1x to 2x NetworkTimeout This patch increases the gRPC server timeout from 1x to 2x NetworkTimeout. This timeout determines how long the server will wait for a TCP send to receive a TCP ack before automatically closing the connection. gRPC enforces this via the OS TCP stack by setting TCP_USER_TIMEOUT on the network socket. While NetworkTimeout should be sufficient here, we have seen instances where this is affected by node load or other factors, so we set it to 2x NetworkTimeout to avoid spurious closed connections. An aggressive timeout is not particularly beneficial here, because the client-side timeout (in our case the CRDB RPC heartbeat) is what matters for recovery time following network or node outages -- the server side doesn't really care if the connection remains open for a bit longer. Epic: none Release note (ops change): The default gRPC server-side send timeout has been increased from 2 seconds to 4 seconds (1x to 2x of COCKROACH_NETWORK_TIMEOUT), to avoid spurious connection failures in certain scenarios. This can be controlled via the new environment variable COCKROACH_RPC_SERVER_TIMEOUT. --- pkg/rpc/keepalive.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/rpc/keepalive.go b/pkg/rpc/keepalive.go index abfe2dbc313b..8e3d1427da04 100644 --- a/pkg/rpc/keepalive.go +++ b/pkg/rpc/keepalive.go @@ -14,9 +14,24 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "google.golang.org/grpc/keepalive" ) +// serverTimeout is how long the server will wait for a TCP send to receive a +// TCP ack before automatically closing the connection. gRPC enforces this via +// the OS TCP stack by setting TCP_USER_TIMEOUT on the network socket. +// +// While NetworkTimeout should be sufficient here, we have seen instances where +// this is affected by node load or other factors, so we set it to 2x +// NetworkTimeout to avoid spurious closed connections. An aggressive timeout is +// not particularly beneficial here, because the client-side timeout (in our +// case the CRDB RPC heartbeat) is what matters for recovery time following +// network or node outages -- the server side doesn't really care if the +// connection remains open for a bit longer. +var serverTimeout = envutil.EnvOrDefaultDuration( + "COCKROACH_RPC_SERVER_TIMEOUT", 2*base.NetworkTimeout) + // 10 seconds is the minimum keepalive interval permitted by gRPC. // Setting it to a value lower than this will lead to gRPC adjusting to this // value and annoyingly logging "Adjusting keepalive ping interval to minimum @@ -39,11 +54,9 @@ var clientKeepalive = keepalive.ClientParameters{ var serverKeepalive = keepalive.ServerParameters{ // Send periodic pings on the connection when there is no other traffic. Time: base.PingInterval, - // If the pings don't get a response within the timeout, we might be - // experiencing a network partition. gRPC will close the transport-level - // connection and all the pending RPCs (which may not have timeouts) will - // fail eagerly. - Timeout: base.NetworkTimeout, + // Close the connection if a TCP send (including a ping) does not receive a + // TCP ack within the given timeout. Enforced by the OS via TCP_USER_TIMEOUT. + Timeout: serverTimeout, } // By default, gRPC disconnects clients that send "too many" pings, From e980fd134c0e0d96e2fe3c90c2d6d58e273828aa Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 28 Aug 2023 13:32:50 -0400 Subject: [PATCH 4/4] kv: remove assertions around non-txn'al locking reqs Closes #107860. Closes #109222. Closes #109581. Closes #109582. We might want to re-introduce these assertions in the future and reject these requests higher up the stack. For now, just remove them to deflake tests. Release note: None --- pkg/kv/kvserver/concurrency/lock_table.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index b234d235f096..3d8bf24432bb 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -801,19 +801,17 @@ func (g *lockTableGuardImpl) curLockMode() lock.Mode { // makeLockMode constructs and returns a lock mode. func makeLockMode(str lock.Strength, txn *roachpb.Transaction, ts hlc.Timestamp) lock.Mode { + iso := isolation.Serializable + if txn != nil { + iso = txn.IsoLevel + } switch str { case lock.None: - iso := isolation.Serializable - if txn != nil { - iso = txn.IsoLevel - } return lock.MakeModeNone(ts, iso) case lock.Shared: - assert(txn != nil, "only transactional requests can acquire shared locks") return lock.MakeModeShared() case lock.Exclusive: - assert(txn != nil, "only transactional requests can acquire exclusive locks") - return lock.MakeModeExclusive(ts, txn.IsoLevel) + return lock.MakeModeExclusive(ts, iso) case lock.Intent: return lock.MakeModeIntent(ts) default: