Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
124885: kv: retry liveness heartbeat on race with insufficient expiration r=nvanbenschoten a=nvanbenschoten

Fixes #124693.
Fixes #125287.

This commit adds logic to retry the synchronous liveness heartbeat which ensures that the liveness record has a later expiration than the prior lease by the time the lease promotion goes into effect, which was added in #123442. This heartbeat may need to be retried because it may have raced with another liveness heartbeat which did not extend the liveness expiration far enough.

We opt to allow this race and retry across it instead of detecting it and returning an error from `NodeLiveness.Heartbeat` because:
1. returning an error would have a larger blast radius and could cause other issues.
2. returning an error wouldn't actually fix the tests that are failing, because they would still get an error, just a different one.

Before this commit, `TestLeaseQueueProactiveEnqueueOnPreferences` would hit this case fail under deadlock and stress every ~150 iterations. After this commit, the test passes continuously under deadlock stress for over 2000 runs.

This makes #123442 even uglier. The nicer solution is #125235, but that is not backportable. Still, we're planning to address that issue as part of the leader leases work, so this is a temporary fix.

This also removes a TODO added in 1dc18df. As mentioned above, we don't address it, but instead document the behavior.

Release note (bug fix): resolves a concerning log message that says "expiration of liveness record ... is not greater than expiration of the previous lease ... after liveness heartbeat". This message is no longer possible.

125309: streamingccl: fix metric name r=msbutler a=stevendanna

This metric was re-using the name of another metric.

Epic: none
Release note: None

125369: streamproducer: add emit_wait and produce_wait to vtable r=rickystewart a=dt

Release note: none.
Epic: none.

125411: streamingccl: cleanup unit test terminology r=stevendanna a=stevendanna

Mixing source/target with serverA/serverB was a bit confusing.

Epic: none
Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
4 people committed Jun 10, 2024
5 parents 9e2e4fb + b2ac52a + 0c7766e + 6fbb40c + 4b0f3d8 commit d554c0e
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 59 deletions.
3 changes: 2 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,7 @@
<tr><td>APPLICATION</td><td>kv.protectedts.reconciliation.records_removed</td><td>number of records removed during reconciliation runs on this node</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.admit_latency</td><td>Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.batch_bytes</td><td>Number of bytes in a given batch</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.batch_hist_nanos</td><td>Time spent flushing a batch</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.checkpoint_events_ingested</td><td>Checkpoint events ingested by all replication jobs</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.commit_latency</td><td>Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.distsql_replan_count</td><td>Total number of dist sql replanning events</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand All @@ -1342,7 +1343,7 @@
<tr><td>APPLICATION</td><td>logical_replication.flush_on_size</td><td>Number of flushes caused by hitting the buffer size limit</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_on_time</td><td>Number of flushes caused by hitting the time limit</td><td>Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_row_count</td><td>Number of rows in a given flush</td><td>Rows</td><td>HISTOGRAM</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_wait_nanos</td><td>Time spent flushing a batch</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flush_wait_nanos</td><td>Time spenting waiting for an in-progress flush</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.flushes</td><td>Total flushes across all replication jobs</td><td>Flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.job_progress_updates</td><td>Total number of updates to the ingestion job progress</td><td>Job Updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>logical_replication.logical_bytes</td><td>Logical bytes (sum of keys + values) ingested by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
54 changes: 27 additions & 27 deletions pkg/ccl/streamingccl/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
serverB := testcluster.StartTestCluster(t, 1, clusterArgs)
defer serverB.Stopper().Stop(ctx)

source := sqlutils.MakeSQLRunner(serverA.Server(0).ApplicationLayer().SQLConn(t))
target := sqlutils.MakeSQLRunner(serverB.Server(0).ApplicationLayer().SQLConn(t))
serverASQL := sqlutils.MakeSQLRunner(serverA.Server(0).ApplicationLayer().SQLConn(t))
serverBSQL := sqlutils.MakeSQLRunner(serverB.Server(0).ApplicationLayer().SQLConn(t))

for _, s := range []string{
"SET CLUSTER SETTING kv.rangefeed.enabled = true",
Expand All @@ -65,17 +65,17 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
"SET CLUSTER SETTING logical_replication.consumer.minimum_flush_interval = '10ms'",
"SET CLUSTER SETTING logical_replication.consumer.timestamp_granularity = '100ms'",
} {
source.Exec(t, s)
target.Exec(t, s)
serverASQL.Exec(t, s)
serverBSQL.Exec(t, s)
}

source.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
target.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
source.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
target.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
serverASQL.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
serverBSQL.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
serverASQL.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
serverBSQL.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")

source.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
target.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')")
serverASQL.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
serverBSQL.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')")

serverAURL, cleanup := sqlutils.PGUrl(t, serverA.Server(0).ApplicationLayer().SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
Expand All @@ -86,31 +86,31 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
jobAID jobspb.JobID
jobBID jobspb.JobID
)
target.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverAURL.String(), `ARRAY['tab']`)).Scan(&jobAID)
source.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverBURL.String(), `ARRAY['tab']`)).Scan(&jobBID)
serverASQL.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverBURL.String(), `ARRAY['tab']`)).Scan(&jobAID)
serverBSQL.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverAURL.String(), `ARRAY['tab']`)).Scan(&jobBID)

now := serverA.Server(0).Clock().Now()
t.Logf("waiting for replication job %d", jobAID)
WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), target, jobAID)
WaitUntilReplicatedTime(t, now, serverASQL, jobAID)
t.Logf("waiting for replication job %d", jobBID)
WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), source, jobBID)
WaitUntilReplicatedTime(t, now, serverBSQL, jobBID)

source.Exec(t, "INSERT INTO tab VALUES (2, 'potato')")
target.Exec(t, "INSERT INTO tab VALUES (3, 'celeriac')")
source.Exec(t, "UPSERT INTO tab VALUES (1, 'hello, again')")
target.Exec(t, "UPSERT INTO tab VALUES (1, 'goodbye, again')")
serverASQL.Exec(t, "INSERT INTO tab VALUES (2, 'potato')")
serverBSQL.Exec(t, "INSERT INTO tab VALUES (3, 'celeriac')")
serverASQL.Exec(t, "UPSERT INTO tab VALUES (1, 'hello, again')")
serverBSQL.Exec(t, "UPSERT INTO tab VALUES (1, 'goodbye, again')")

WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), target, jobAID)
WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), source, jobBID)
now = serverA.Server(0).Clock().Now()
WaitUntilReplicatedTime(t, now, serverASQL, jobAID)
WaitUntilReplicatedTime(t, now, serverBSQL, jobBID)

target.CheckQueryResults(t, "SELECT * from tab", [][]string{
expectedRows := [][]string{
{"1", "goodbye, again"},
{"2", "potato"},
{"3", "celeriac"},
})
source.CheckQueryResults(t, "SELECT * from tab", [][]string{
{"1", "goodbye, again"},
{"2", "potato"},
{"3", "celeriac"},
})
}
serverBSQL.CheckQueryResults(t, "SELECT * from tab", expectedRows)
serverASQL.CheckQueryResults(t, "SELECT * from tab", expectedRows)
}

func WaitUntilReplicatedTime(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/logical/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var (
Unit: metric.Unit_BYTES,
}
metaReplicationBatchHistNanos = metric.Metadata{
Name: "logical_replication.flush_wait_nanos",
Name: "logical_replication.batch_hist_nanos",
Help: "Time spent flushing a batch",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
Expand Down
14 changes: 14 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type eventStream struct {
lastCheckpointTime time.Time
lastCheckpointLen int

lastPolled time.Time

debug streampb.DebugProducerStatus
}

Expand Down Expand Up @@ -111,6 +113,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
return errors.AssertionFailedf("expected to be started once")
}

s.lastPolled = timeutil.Now()

sourceTenantID, err := s.validateProducerJobAndSpec(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -215,6 +219,12 @@ func (s *eventStream) setErr(err error) bool {

// Next implements eval.ValueGenerator interface.
func (s *eventStream) Next(ctx context.Context) (bool, error) {
emitWait := int64(timeutil.Since(s.lastPolled))

s.debug.Flushes.LastEmitWaitNanos.Store(emitWait)
s.debug.Flushes.EmitWaitNanos.Add(emitWait)
s.lastPolled = timeutil.Now()

select {
case <-ctx.Done():
return false, ctx.Err()
Expand All @@ -226,6 +236,10 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) {
case err := <-s.errCh:
return false, err
default:
produceWait := int64(timeutil.Since(s.lastPolled))
s.debug.Flushes.ProduceWaitNanos.Add(produceWait)
s.debug.Flushes.LastProduceWaitNanos.Store(produceWait)
s.lastPolled = timeutil.Now()
return true, nil
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ var zipInternalTablesPerCluster = DebugZipTableRegistry{
"batches",
"checkpoints",
"megabytes",
"produce_wait",
"emit_wait",
"last_produce_wait",
"last_emit_wait",
"last_checkpoint",
"rf_checkpoints",
"rf_advances",
Expand Down
15 changes: 3 additions & 12 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,9 @@ var errNodeAlreadyLive = errors.New("node already live")
//
// If this method returns nil, the node's liveness has been extended,
// relative to the previous value. It may or may not still be alive
// when this method returns.
// when this method returns. It may also not have been extended as far
// as the livenessThreshold, because the caller may have raced with
// another heartbeater.
//
// On failure, this method returns ErrEpochIncremented, although this
// may not necessarily mean that the epoch was actually incremented.
Expand Down Expand Up @@ -839,17 +841,6 @@ func (nl *NodeLiveness) heartbeatInternal(
// expired while in flight, so maybe we don't have to care about
// that and only need to distinguish between same and different
// epochs in our return value.
//
// TODO(nvanbenschoten): Unlike the early return above, this doesn't
// guarantee that the resulting expiration is past minExpiration,
// only that it's different than our oldLiveness. Is that ok? It
// hasn't caused issues so far, but we might want to detect this
// case and retry, at least in the case of the liveness heartbeat
// loop. The downside of this is that a heartbeat that's intending
// to bump the expiration of a record out 9s into the future may
// return a success even if the expiration is only 5 seconds in the
// future. The next heartbeat will then start with only 0.5 seconds
// before expiration.
if actual.IsLive(nl.clock.Now()) && !incrementEpoch {
return errNodeAlreadyLive
}
Expand Down
37 changes: 24 additions & 13 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,20 +538,31 @@ func (p *pendingLeaseRequest) requestLease(
// interval are the same.
expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch
if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
curLiveness := reqLeaseLiveness
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, curLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
// Assert that the liveness record expiration is now greater than the
// expiration of the lease we're promoting.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat", l, status.Lease)
// Check whether the liveness record expiration is now greater than the
// expiration of the lease we're promoting. If not, we may have raced with
// another liveness heartbeat which did not extend the liveness expiration
// far enough and we should try again.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok {
return errors.NewAssertionErrorWithWrappedErrf(liveness.ErrRecordCacheMiss, "after heartbeat")
}
if l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
log.Infof(ctx, "expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat, retrying...", l, status.Lease)
curLiveness = l.Liveness
continue
}
break
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/repstream/streampb/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type DebugProducerStatus struct {
ResolvedMicros atomic.Int64
}
Flushes struct {
Batches, Checkpoints, Bytes atomic.Int64
Batches, Checkpoints, Bytes, EmitWaitNanos, ProduceWaitNanos atomic.Int64
LastProduceWaitNanos, LastEmitWaitNanos atomic.Int64
}
LastCheckpoint struct {
Micros atomic.Int64
Expand Down
41 changes: 38 additions & 3 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
Expand Down Expand Up @@ -9028,6 +9029,11 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
megabytes FLOAT,
last_checkpoint INTERVAL,
produce_wait INTERVAL,
emit_wait INTERVAL,
last_produce_wait INTERVAL,
last_emit_wait INTERVAL,
rf_checkpoints INT,
rf_advances INT,
rf_last_advance INTERVAL,
Expand All @@ -9052,25 +9058,54 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
return tree.NewDInterval(duration.Age(now, t), types.DefaultIntervalTypeMetadata)
}

// Transform `.0000000000` to `.0` to shorted/de-noise HLCs.
shortenLogical := func(d *tree.DDecimal) *tree.DDecimal {
var tmp apd.Decimal
d.Modf(nil, &tmp)
if tmp.IsZero() {
if _, err := tree.DecimalCtx.Quantize(&tmp, &d.Decimal, -1); err == nil {
d.Decimal = tmp
}
}
return d
}

for _, s := range sm.DebugGetProducerStatuses(ctx) {
resolved := time.UnixMicro(s.RF.ResolvedMicros.Load())
resolvedDatum := tree.DNull
if resolved.Unix() != 0 {
resolvedDatum = eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()})
resolvedDatum = shortenLogical(eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()}))
}

if err := addRow(
tree.NewDInt(tree.DInt(s.StreamID)),
tree.NewDString(fmt.Sprintf("%d[%d]", s.Spec.ConsumerNode, s.Spec.ConsumerProc)),
tree.NewDInt(tree.DInt(len(s.Spec.Spans))),
eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp),
eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp)),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp)),

tree.NewDInt(tree.DInt(s.Flushes.Batches.Load())),
tree.NewDInt(tree.DInt(s.Flushes.Checkpoints.Load())),
tree.NewDFloat(tree.DFloat(math.Round(float64(s.Flushes.Bytes.Load())/float64(1<<18))/4)),
age(time.UnixMicro(s.LastCheckpoint.Micros.Load())),

tree.NewDInterval(
duration.MakeDuration(s.Flushes.ProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.EmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastEmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),

tree.NewDInt(tree.DInt(s.RF.Checkpoints.Load())),
tree.NewDInt(tree.DInt(s.RF.Advances.Load())),
age(time.UnixMicro(s.RF.LastAdvanceMicros.Load())),
Expand Down
Loading

0 comments on commit d554c0e

Please sign in to comment.