Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
109186: pkg/util/log: flush buffered network sinks on panic r=knz a=abarganier

Previously, our crash reporter system would flush file log sinks
as part of the process to handle a panic.

This was an incomplete process, since buffered network sinks were
not included in part of this flush process. This means that many
times, panic logs would not make it to the network target, leading
to a loss in observability.

This patch introduces `log.FlushAllSync()`, which flushes both file
and buffered network log sinks. It then updates the crash reporter
to call into this, instead of just flushing file log sinks.

`FlushAllSync()` contains timeout logic to prevent the process from
completing if one of the underlying child sinks that a bufferedSink
wraps becomes unavailable/hangs on its `output()` call.

We originally attempted to fix this in #101562, but a bug in the 
bufferedSink code led us to roll back those changes. The bug in the 
bufferedSink code has since been fixed (#108928), so we can safely 
introduce this logic again.

Release note: none

Fixes: #106345

109578: rpc: increase gRPC server timeout from 1x to 2x NetworkTimeout r=andrewbaptist a=erikgrinaker

This is intended as a conservative backport that changes as little as possible. For 23.2, we should restructure these settings a bit, possibly by removing NetworkTimeout and using independent timeouts for each component/parameter, since they have unique considerations (e.g. whether they are enforced above the Go runtime or by the OS, to what extent they are subject to RPC head-of-line blocking, etc).

---

This patch increases the gRPC server timeout from 1x to 2x NetworkTimeout. This timeout determines how long the server will wait for a TCP send to receive a TCP ack before automatically closing the connection. gRPC enforces this via the OS TCP stack by setting TCP_USER_TIMEOUT on the network socket.

While NetworkTimeout should be sufficient here, we have seen instances where this is affected by node load or other factors, so we set it to 2x NetworkTimeout to avoid spurious closed connections. An aggressive timeout is not particularly beneficial here, because the client-side timeout (in our case the CRDB RPC heartbeat) is what matters for recovery time following network or node outages -- the server side doesn't really care if the connection remains open for a bit longer.

Touches #109317.

Epic: none
Release note (ops change): The default gRPC server-side send timeout has been increased from 2 seconds to 4 seconds (1x to 2x of COCKROACH_NETWORK_TIMEOUT), to avoid spurious connection failures in certain scenarios. This can be controlled via the new environment variable COCKROACH_RPC_SERVER_TIMEOUT.

109610: kv: remove assertions around non-txn'al locking reqs r=nvanbenschoten a=nvanbenschoten

Closes #107860.
Closes #109222.
Closes #109581.
Closes #109582.

We might want to re-introduce these assertions in the future and reject these requests higher up the stack. For now, just remove them to deflake tests.

Release note: None

Co-authored-by: Alex Barganier <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
4 people committed Aug 28, 2023
4 parents 1577334 + f113fb4 + 297a1d3 + e980fd1 commit 6a31120
Show file tree
Hide file tree
Showing 43 changed files with 157 additions and 89 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/auditloggingccl/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestRoleBasedAuditEnterpriseGated(t *testing.T) {
// Run a test query.
rootRunner.Exec(t, `SHOW CLUSTER SETTING sql.log.user_audit`)

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand All @@ -87,7 +87,7 @@ func TestRoleBasedAuditEnterpriseGated(t *testing.T) {
// Run a test query.
rootRunner.Exec(t, `SHOW CLUSTER SETTING sql.log.user_audit`)

log.Flush()
log.FlushFiles()

entries, err = log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestSingleRoleAuditLogging(t *testing.T) {
rootRunner.Exec(t, fmt.Sprintf("REVOKE %s from testuser", td.role))
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestMultiRoleAuditLogging(t *testing.T) {
testRunner.Exec(t, query)
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestReducedAuditConfig(t *testing.T) {
// for the user at that time.
testRunner.Exec(t, testQuery)

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand All @@ -439,7 +439,7 @@ func TestReducedAuditConfig(t *testing.T) {
// The user now has a corresponding audit setting. We use a new query here to differentiate.
testRunner2.Exec(t, `GRANT SELECT ON TABLE u TO root`)

log.Flush()
log.FlushFiles()

entries, err = log.FetchEntriesFromFiles(
0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func requireRecoveryEvent(
expected eventpb.RecoveryEvent,
) {
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(
startTime,
math.MaxInt64,
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ func TestChangefeedSchemaChangeNoBackfill(t *testing.T) {

cdcTest(t, testFn)

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"),
log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2127,7 +2127,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2323,7 +2323,7 @@ func TestChangefeedSchemaChangeAllowBackfill_Legacy(t *testing.T) {

cdcTestWithSystem(t, testFn)

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2480,7 +2480,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {

cdcTestWithSystem(t, testFn)

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2543,7 +2543,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
}

cdcTestWithSystem(t, testFn)
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down Expand Up @@ -2574,7 +2574,7 @@ func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) {
}

cdcTest(t, testFn)
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`)
func checkStructuredLogs(t *testing.T, eventType string, startTime int64) []string {
var matchingEntries []string
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(startTime,
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestChangefeedNemeses(t *testing.T) {
//
// TODO(knz): This seems incorrect, see issue #109417.
cdcTest(t, testFn, feedTestNoTenants)
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/telemetryccl/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestTelemetryLogRegions(t *testing.T) {
sqlDB.Exec(t, tc.query)
}

log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(
0,
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
execTimestamp++
}

log.Flush()
log.FlushFiles()

var filteredSampleQueries []logpb.Entry
testutils.SucceedsSoon(t, func() error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/testccl/authccl/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func TestClientAddrOverride(t *testing.T) {
t.Run("check-server-log-uses-override", func(t *testing.T) {
// Wait for the disconnection event in logs.
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, sessionTerminatedRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand All @@ -539,7 +539,7 @@ func TestClientAddrOverride(t *testing.T) {
})

// Now we want to check that the logging tags are also updated.
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTime.UnixNano(), math.MaxInt64, 10000, authLogFileRe,
log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/testccl/sqlccl/tenant_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) {

checkGCBlockedByPTS := func(t *testing.T, sj *jobs.StartableJob, tenID uint64) {
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile(fmt.Sprintf("GC TTL for dropped tenant %d has expired, but protected timestamp record\\(s\\)", tenID)),
log.WithFlattenedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func runConnectInit(cmd *cobra.Command, args []string) (retErr error) {
}

// Ensure that log files are populated when the process terminates.
defer log.Flush()
defer log.FlushFiles()

peers := []string(serverCfg.JoinList)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_send_kv_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestSendKVBatch(t *testing.T) {
require.JSONEq(t, jsonResponse, output)

// Check that a structured log event was emitted.
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(start.UnixNano(), timeutil.Now().UnixNano(), 1,
regexp.MustCompile("debug_send_kv_batch"), log.WithFlattenedSensitiveData)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func runDemoInternal(
}

// Ensure the last few entries in the log files are flushed at the end.
defer log.Flush()
defer log.FlushFiles()

return sqlCtx.Run(ctx, conn)
}
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func createAndStartServerAsync(

go func() {
// Ensure that the log files see the startup messages immediately.
defer log.Flush()
defer log.FlushAllSync()
// If anything goes dramatically wrong, use Go's panic/recover
// mechanism to intercept the panic and log the panic details to
// the error reporting server.
Expand Down Expand Up @@ -1497,7 +1497,7 @@ func reportReadinessExternally(ctx context.Context, cmd *cobra.Command, waitForI
// Ensure the configuration logging is written to disk in case a
// process is waiting for the sdnotify readiness to read important
// information from there.
log.Flush()
log.FlushAllSync()

// Signal readiness. This unblocks the process when running with
// --background or under systemd.
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobstest/logutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func CheckEmittedEvents(
) {
// Check that the structured event was logged.
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(startTime,
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ SELECT unnest(execution_errors)
t *testing.T, id jobspb.JobID, status jobs.Status,
from, to time.Time, cause string,
) {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(
from.UnixNano(), to.UnixNano(), 2,
regexp.MustCompile(fmt.Sprintf(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ func TestReceiveSnapshotLogging(t *testing.T) {
// When ready, flush logs and check messages from store_raft.go since
// call to repl.ChangeReplicas(..).
<-signals.receiverDoneCh
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`store_raft\.go`), log.WithMarkedSensitiveData)
require.NoError(t, err)
Expand Down
12 changes: 5 additions & 7 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,19 +801,17 @@ func (g *lockTableGuardImpl) curLockMode() lock.Mode {

// makeLockMode constructs and returns a lock mode.
func makeLockMode(str lock.Strength, txn *roachpb.Transaction, ts hlc.Timestamp) lock.Mode {
iso := isolation.Serializable
if txn != nil {
iso = txn.IsoLevel
}
switch str {
case lock.None:
iso := isolation.Serializable
if txn != nil {
iso = txn.IsoLevel
}
return lock.MakeModeNone(ts, iso)
case lock.Shared:
assert(txn != nil, "only transactional requests can acquire shared locks")
return lock.MakeModeShared()
case lock.Exclusive:
assert(txn != nil, "only transactional requests can acquire exclusive locks")
return lock.MakeModeExclusive(ts, txn.IsoLevel)
return lock.MakeModeExclusive(ts, iso)
case lock.Intent:
return lock.MakeModeIntent(ts)
default:
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestCorruptData(t *testing.T) {
require.NoError(t, err)
}

log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg,
log.WithFlattenedSensitiveData)
require.NoError(t, err)
Expand Down Expand Up @@ -739,7 +739,7 @@ func TestCorruptData(t *testing.T) {
require.Nil(t, got)
_, err = pts.GetState(ctx)
require.NoError(t, err)
log.Flush()
log.FlushFiles()

entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 100, msg,
log.WithFlattenedSensitiveData)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func TestReplicateQueueTracingOnError(t *testing.T) {

// Flush logs and get log messages from replicate_queue.go since just
// before calling store.Enqueue(..).
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData)
require.NoError(t, err)
Expand Down
23 changes: 18 additions & 5 deletions pkg/rpc/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,24 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"google.golang.org/grpc/keepalive"
)

// serverTimeout is how long the server will wait for a TCP send to receive a
// TCP ack before automatically closing the connection. gRPC enforces this via
// the OS TCP stack by setting TCP_USER_TIMEOUT on the network socket.
//
// While NetworkTimeout should be sufficient here, we have seen instances where
// this is affected by node load or other factors, so we set it to 2x
// NetworkTimeout to avoid spurious closed connections. An aggressive timeout is
// not particularly beneficial here, because the client-side timeout (in our
// case the CRDB RPC heartbeat) is what matters for recovery time following
// network or node outages -- the server side doesn't really care if the
// connection remains open for a bit longer.
var serverTimeout = envutil.EnvOrDefaultDuration(
"COCKROACH_RPC_SERVER_TIMEOUT", 2*base.NetworkTimeout)

// 10 seconds is the minimum keepalive interval permitted by gRPC.
// Setting it to a value lower than this will lead to gRPC adjusting to this
// value and annoyingly logging "Adjusting keepalive ping interval to minimum
Expand All @@ -39,11 +54,9 @@ var clientKeepalive = keepalive.ClientParameters{
var serverKeepalive = keepalive.ServerParameters{
// Send periodic pings on the connection when there is no other traffic.
Time: base.PingInterval,
// If the pings don't get a response within the timeout, we might be
// experiencing a network partition. gRPC will close the transport-level
// connection and all the pending RPCs (which may not have timeouts) will
// fail eagerly.
Timeout: base.NetworkTimeout,
// Close the connection if a TCP send (including a ping) does not receive a
// TCP ack within the given timeout. Enforced by the OS via TCP_USER_TIMEOUT.
Timeout: serverTimeout,
}

// By default, gRPC disconnects clients that send "too many" pings,
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/certmgr/cert_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var cmLogRe = regexp.MustCompile(`event_log\.go`)

// Check that the structured event was logged.
func checkLogStructEntry(t *testing.T, expectSuccess bool, beforeReload time.Time) error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/certs_rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestRotateCerts(t *testing.T) {
// the moment the structured logging event is actually
// written to the log file.
testutils.SucceedsSoon(t, func() error {
log.Flush()
log.FlushFiles()
entries, err := log.FetchEntriesFromFiles(beforeReload.UnixNano(),
math.MaxInt64, 10000, cmLogRe, log.WithMarkedSensitiveData)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func TestPersistHLCUpperBound(t *testing.T) {
var fatal bool
defer log.ResetExitFunc()
log.SetExitFunc(true /* hideStack */, func(r exit.Code) {
defer log.Flush()
defer log.FlushFiles()
if r == exit.FatalError() {
fatal = true
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ func (s *statusServer) LogFilesList(
}
return status.LogFilesList(ctx, req)
}
log.Flush()
log.FlushFiles()
logFiles, err := log.ListLogFiles()
if err != nil {
return nil, srverrors.ServerError(ctx, err)
Expand Down Expand Up @@ -1277,7 +1277,7 @@ func (s *statusServer) LogFile(
inputEditMode := log.SelectEditMode(req.Redact, log.KeepRedactable)

// Ensure that the latest log entries are available in files.
log.Flush()
log.FlushFiles()

// Read the logs.
reader, err := log.GetLogReader(req.File)
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func (s *statusServer) Logs(
}

// Ensure that the latest log entries are available in files.
log.Flush()
log.FlushFiles()

// Read the logs.
entries, err := log.FetchEntriesFromFiles(
Expand Down
Loading

0 comments on commit 6a31120

Please sign in to comment.