diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 22eb01c7e291..b3fe850b538c 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -1333,6 +1333,7 @@
APPLICATION | kv.protectedts.reconciliation.records_removed | number of records removed during reconciliation runs on this node | Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.admit_latency | Event admission latency: a difference between event MVCC timestamp and the time it was admitted into ingestion processor | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | logical_replication.batch_bytes | Number of bytes in a given batch | Bytes | HISTOGRAM | BYTES | AVG | NONE |
+APPLICATION | logical_replication.batch_hist_nanos | Time spent flushing a batch | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | logical_replication.checkpoint_events_ingested | Checkpoint events ingested by all replication jobs | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.commit_latency | Event commit latency: a difference between event MVCC timestamp and the time it was flushed into disk. If we batch events, then the difference between the oldest event in the batch and flush is recorded | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | logical_replication.distsql_replan_count | Total number of dist sql replanning events | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
@@ -1342,7 +1343,7 @@
APPLICATION | logical_replication.flush_on_size | Number of flushes caused by hitting the buffer size limit | Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.flush_on_time | Number of flushes caused by hitting the time limit | Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.flush_row_count | Number of rows in a given flush | Rows | HISTOGRAM | COUNT | AVG | NONE |
-APPLICATION | logical_replication.flush_wait_nanos | Time spent flushing a batch | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
+APPLICATION | logical_replication.flush_wait_nanos | Time spenting waiting for an in-progress flush | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | logical_replication.flushes | Total flushes across all replication jobs | Flushes | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.job_progress_updates | Total number of updates to the ingestion job progress | Job Updates | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | logical_replication.logical_bytes | Logical bytes (sum of keys + values) ingested by all replication jobs | Bytes | COUNTER | BYTES | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/ccl/streamingccl/logical/logical_replication_job_test.go b/pkg/ccl/streamingccl/logical/logical_replication_job_test.go
index 2ee892b4d549..580fedf245e8 100644
--- a/pkg/ccl/streamingccl/logical/logical_replication_job_test.go
+++ b/pkg/ccl/streamingccl/logical/logical_replication_job_test.go
@@ -48,8 +48,8 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
serverB := testcluster.StartTestCluster(t, 1, clusterArgs)
defer serverB.Stopper().Stop(ctx)
- source := sqlutils.MakeSQLRunner(serverA.Server(0).ApplicationLayer().SQLConn(t))
- target := sqlutils.MakeSQLRunner(serverB.Server(0).ApplicationLayer().SQLConn(t))
+ serverASQL := sqlutils.MakeSQLRunner(serverA.Server(0).ApplicationLayer().SQLConn(t))
+ serverBSQL := sqlutils.MakeSQLRunner(serverB.Server(0).ApplicationLayer().SQLConn(t))
for _, s := range []string{
"SET CLUSTER SETTING kv.rangefeed.enabled = true",
@@ -65,17 +65,17 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
"SET CLUSTER SETTING logical_replication.consumer.minimum_flush_interval = '10ms'",
"SET CLUSTER SETTING logical_replication.consumer.timestamp_granularity = '100ms'",
} {
- source.Exec(t, s)
- target.Exec(t, s)
+ serverASQL.Exec(t, s)
+ serverBSQL.Exec(t, s)
}
- source.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
- target.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
- source.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
- target.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
+ serverASQL.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
+ serverBSQL.Exec(t, "CREATE TABLE tab (pk int primary key, payload string)")
+ serverASQL.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
+ serverBSQL.Exec(t, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
- source.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
- target.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')")
+ serverASQL.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
+ serverBSQL.Exec(t, "INSERT INTO tab VALUES (1, 'goodbye')")
serverAURL, cleanup := sqlutils.PGUrl(t, serverA.Server(0).ApplicationLayer().SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
@@ -86,31 +86,31 @@ func TestLogicalStreamIngestionJob(t *testing.T) {
jobAID jobspb.JobID
jobBID jobspb.JobID
)
- target.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverAURL.String(), `ARRAY['tab']`)).Scan(&jobAID)
- source.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverBURL.String(), `ARRAY['tab']`)).Scan(&jobBID)
+ serverASQL.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverBURL.String(), `ARRAY['tab']`)).Scan(&jobAID)
+ serverBSQL.QueryRow(t, fmt.Sprintf("SELECT crdb_internal.start_logical_replication_job('%s', %s)", serverAURL.String(), `ARRAY['tab']`)).Scan(&jobBID)
+
+ now := serverA.Server(0).Clock().Now()
t.Logf("waiting for replication job %d", jobAID)
- WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), target, jobAID)
+ WaitUntilReplicatedTime(t, now, serverASQL, jobAID)
t.Logf("waiting for replication job %d", jobBID)
- WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), source, jobBID)
+ WaitUntilReplicatedTime(t, now, serverBSQL, jobBID)
- source.Exec(t, "INSERT INTO tab VALUES (2, 'potato')")
- target.Exec(t, "INSERT INTO tab VALUES (3, 'celeriac')")
- source.Exec(t, "UPSERT INTO tab VALUES (1, 'hello, again')")
- target.Exec(t, "UPSERT INTO tab VALUES (1, 'goodbye, again')")
+ serverASQL.Exec(t, "INSERT INTO tab VALUES (2, 'potato')")
+ serverBSQL.Exec(t, "INSERT INTO tab VALUES (3, 'celeriac')")
+ serverASQL.Exec(t, "UPSERT INTO tab VALUES (1, 'hello, again')")
+ serverBSQL.Exec(t, "UPSERT INTO tab VALUES (1, 'goodbye, again')")
- WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), target, jobAID)
- WaitUntilReplicatedTime(t, serverA.Server(0).Clock().Now(), source, jobBID)
+ now = serverA.Server(0).Clock().Now()
+ WaitUntilReplicatedTime(t, now, serverASQL, jobAID)
+ WaitUntilReplicatedTime(t, now, serverBSQL, jobBID)
- target.CheckQueryResults(t, "SELECT * from tab", [][]string{
+ expectedRows := [][]string{
{"1", "goodbye, again"},
{"2", "potato"},
{"3", "celeriac"},
- })
- source.CheckQueryResults(t, "SELECT * from tab", [][]string{
- {"1", "goodbye, again"},
- {"2", "potato"},
- {"3", "celeriac"},
- })
+ }
+ serverBSQL.CheckQueryResults(t, "SELECT * from tab", expectedRows)
+ serverASQL.CheckQueryResults(t, "SELECT * from tab", expectedRows)
}
func WaitUntilReplicatedTime(
diff --git a/pkg/ccl/streamingccl/logical/metrics.go b/pkg/ccl/streamingccl/logical/metrics.go
index 668885475f2b..b2739a18cce1 100644
--- a/pkg/ccl/streamingccl/logical/metrics.go
+++ b/pkg/ccl/streamingccl/logical/metrics.go
@@ -121,7 +121,7 @@ var (
Unit: metric.Unit_BYTES,
}
metaReplicationBatchHistNanos = metric.Metadata{
- Name: "logical_replication.flush_wait_nanos",
+ Name: "logical_replication.batch_hist_nanos",
Help: "Time spent flushing a batch",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go
index 10966a505a82..4c9ca3291f8c 100644
--- a/pkg/ccl/streamingccl/streamproducer/event_stream.go
+++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go
@@ -72,6 +72,8 @@ type eventStream struct {
lastCheckpointTime time.Time
lastCheckpointLen int
+ lastPolled time.Time
+
debug streampb.DebugProducerStatus
}
@@ -111,6 +113,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
return errors.AssertionFailedf("expected to be started once")
}
+ s.lastPolled = timeutil.Now()
+
sourceTenantID, err := s.validateProducerJobAndSpec(ctx)
if err != nil {
return err
@@ -215,6 +219,12 @@ func (s *eventStream) setErr(err error) bool {
// Next implements eval.ValueGenerator interface.
func (s *eventStream) Next(ctx context.Context) (bool, error) {
+ emitWait := int64(timeutil.Since(s.lastPolled))
+
+ s.debug.Flushes.LastEmitWaitNanos.Store(emitWait)
+ s.debug.Flushes.EmitWaitNanos.Add(emitWait)
+ s.lastPolled = timeutil.Now()
+
select {
case <-ctx.Done():
return false, ctx.Err()
@@ -226,6 +236,10 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) {
case err := <-s.errCh:
return false, err
default:
+ produceWait := int64(timeutil.Since(s.lastPolled))
+ s.debug.Flushes.ProduceWaitNanos.Add(produceWait)
+ s.debug.Flushes.LastProduceWaitNanos.Store(produceWait)
+ s.lastPolled = timeutil.Now()
return true, nil
}
}
diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go
index dd08c127867f..d31ef80dee5b 100644
--- a/pkg/cli/zip_table_registry.go
+++ b/pkg/cli/zip_table_registry.go
@@ -384,6 +384,10 @@ var zipInternalTablesPerCluster = DebugZipTableRegistry{
"batches",
"checkpoints",
"megabytes",
+ "produce_wait",
+ "emit_wait",
+ "last_produce_wait",
+ "last_emit_wait",
"last_checkpoint",
"rf_checkpoints",
"rf_advances",
diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go
index 66ea9c47b29c..141fee3b9061 100644
--- a/pkg/kv/kvserver/liveness/liveness.go
+++ b/pkg/kv/kvserver/liveness/liveness.go
@@ -701,7 +701,9 @@ var errNodeAlreadyLive = errors.New("node already live")
//
// If this method returns nil, the node's liveness has been extended,
// relative to the previous value. It may or may not still be alive
-// when this method returns.
+// when this method returns. It may also not have been extended as far
+// as the livenessThreshold, because the caller may have raced with
+// another heartbeater.
//
// On failure, this method returns ErrEpochIncremented, although this
// may not necessarily mean that the epoch was actually incremented.
@@ -839,17 +841,6 @@ func (nl *NodeLiveness) heartbeatInternal(
// expired while in flight, so maybe we don't have to care about
// that and only need to distinguish between same and different
// epochs in our return value.
- //
- // TODO(nvanbenschoten): Unlike the early return above, this doesn't
- // guarantee that the resulting expiration is past minExpiration,
- // only that it's different than our oldLiveness. Is that ok? It
- // hasn't caused issues so far, but we might want to detect this
- // case and retry, at least in the case of the liveness heartbeat
- // loop. The downside of this is that a heartbeat that's intending
- // to bump the expiration of a record out 9s into the future may
- // return a success even if the expiration is only 5 seconds in the
- // future. The next heartbeat will then start with only 0.5 seconds
- // before expiration.
if actual.IsLive(nl.clock.Now()) && !incrementEpoch {
return errNodeAlreadyLive
}
diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go
index 6fa9cc147527..5fc8b2f42b9e 100644
--- a/pkg/kv/kvserver/replica_range_lease.go
+++ b/pkg/kv/kvserver/replica_range_lease.go
@@ -538,20 +538,31 @@ func (p *pendingLeaseRequest) requestLease(
// interval are the same.
expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch
if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
- err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness)
- if err != nil {
- if logFailedHeartbeatOwnLiveness.ShouldLog() {
- log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
+ curLiveness := reqLeaseLiveness
+ for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
+ err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, curLiveness)
+ if err != nil {
+ if logFailedHeartbeatOwnLiveness.ShouldLog() {
+ log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
+ }
+ return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
+ fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
- return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
- fmt.Sprintf("failed to manipulate liveness record: %s", err))
- }
- // Assert that the liveness record expiration is now greater than the
- // expiration of the lease we're promoting.
- l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
- if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
- return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+
- "expiration of the previous lease %s after liveness heartbeat", l, status.Lease)
+ // Check whether the liveness record expiration is now greater than the
+ // expiration of the lease we're promoting. If not, we may have raced with
+ // another liveness heartbeat which did not extend the liveness expiration
+ // far enough and we should try again.
+ l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
+ if !ok {
+ return errors.NewAssertionErrorWithWrappedErrf(liveness.ErrRecordCacheMiss, "after heartbeat")
+ }
+ if l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
+ log.Infof(ctx, "expiration of liveness record %s is not greater than "+
+ "expiration of the previous lease %s after liveness heartbeat, retrying...", l, status.Lease)
+ curLiveness = l.Liveness
+ continue
+ }
+ break
}
}
diff --git a/pkg/repstream/streampb/empty.go b/pkg/repstream/streampb/empty.go
index 5cd9247b2f4a..1c4e1a804a1e 100644
--- a/pkg/repstream/streampb/empty.go
+++ b/pkg/repstream/streampb/empty.go
@@ -24,7 +24,8 @@ type DebugProducerStatus struct {
ResolvedMicros atomic.Int64
}
Flushes struct {
- Batches, Checkpoints, Bytes atomic.Int64
+ Batches, Checkpoints, Bytes, EmitWaitNanos, ProduceWaitNanos atomic.Int64
+ LastProduceWaitNanos, LastEmitWaitNanos atomic.Int64
}
LastCheckpoint struct {
Micros atomic.Int64
diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go
index a930bb141fa2..64b044e4859d 100644
--- a/pkg/sql/crdb_internal.go
+++ b/pkg/sql/crdb_internal.go
@@ -23,6 +23,7 @@ import (
"strings"
"time"
+ "github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
@@ -9028,6 +9029,11 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
megabytes FLOAT,
last_checkpoint INTERVAL,
+ produce_wait INTERVAL,
+ emit_wait INTERVAL,
+ last_produce_wait INTERVAL,
+ last_emit_wait INTERVAL,
+
rf_checkpoints INT,
rf_advances INT,
rf_last_advance INTERVAL,
@@ -9052,25 +9058,54 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
return tree.NewDInterval(duration.Age(now, t), types.DefaultIntervalTypeMetadata)
}
+ // Transform `.0000000000` to `.0` to shorted/de-noise HLCs.
+ shortenLogical := func(d *tree.DDecimal) *tree.DDecimal {
+ var tmp apd.Decimal
+ d.Modf(nil, &tmp)
+ if tmp.IsZero() {
+ if _, err := tree.DecimalCtx.Quantize(&tmp, &d.Decimal, -1); err == nil {
+ d.Decimal = tmp
+ }
+ }
+ return d
+ }
+
for _, s := range sm.DebugGetProducerStatuses(ctx) {
resolved := time.UnixMicro(s.RF.ResolvedMicros.Load())
resolvedDatum := tree.DNull
if resolved.Unix() != 0 {
- resolvedDatum = eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()})
+ resolvedDatum = shortenLogical(eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()}))
}
if err := addRow(
tree.NewDInt(tree.DInt(s.StreamID)),
tree.NewDString(fmt.Sprintf("%d[%d]", s.Spec.ConsumerNode, s.Spec.ConsumerProc)),
tree.NewDInt(tree.DInt(len(s.Spec.Spans))),
- eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp),
- eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp),
+ shortenLogical(eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp)),
+ shortenLogical(eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp)),
tree.NewDInt(tree.DInt(s.Flushes.Batches.Load())),
tree.NewDInt(tree.DInt(s.Flushes.Checkpoints.Load())),
tree.NewDFloat(tree.DFloat(math.Round(float64(s.Flushes.Bytes.Load())/float64(1<<18))/4)),
age(time.UnixMicro(s.LastCheckpoint.Micros.Load())),
+ tree.NewDInterval(
+ duration.MakeDuration(s.Flushes.ProduceWaitNanos.Load(), 0, 0),
+ types.DefaultIntervalTypeMetadata,
+ ),
+ tree.NewDInterval(
+ duration.MakeDuration(s.Flushes.EmitWaitNanos.Load(), 0, 0),
+ types.DefaultIntervalTypeMetadata,
+ ),
+ tree.NewDInterval(
+ duration.MakeDuration(s.Flushes.LastProduceWaitNanos.Load(), 0, 0),
+ types.DefaultIntervalTypeMetadata,
+ ),
+ tree.NewDInterval(
+ duration.MakeDuration(s.Flushes.LastEmitWaitNanos.Load(), 0, 0),
+ types.DefaultIntervalTypeMetadata,
+ ),
+
tree.NewDInt(tree.DInt(s.RF.Checkpoints.Load())),
tree.NewDInt(tree.DInt(s.RF.Advances.Load())),
age(time.UnixMicro(s.RF.LastAdvanceMicros.Load())),
diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog
index adcbdbf6d75f..094f1ec62436 100644
--- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog
+++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog
@@ -399,7 +399,7 @@ SELECT id, strip_volatile(descriptor) FROM crdb_internal.kv_catalog_descriptor O
4294967191 {"schema": {"defaultPrivileges": {"type": "SCHEMA"}, "id": 4294967191, "name": "information_schema", "privileges": {"ownerProto": "node", "users": [{"privileges": "512", "userProto": "public"}], "version": 3}, "version": "1"}}
4294967192 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967192, "name": "cluster_replication_node_stream_checkpoints", "nextColumnId": 7, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}
4294967193 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967193, "name": "cluster_replication_node_stream_spans", "nextColumnId": 5, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}
-4294967194 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "spans", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "initial_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "prev_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "megabytes", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 9, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 10, "name": "rf_checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 11, "name": "rf_advances", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "rf_last_advance", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "rf_resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 14, "name": "rf_resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967194, "name": "cluster_replication_node_streams", "nextColumnId": 15, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}
+4294967194 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "spans", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "initial_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "prev_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "megabytes", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 9, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 10, "name": "produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 11, "name": "emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 12, "name": "last_produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 14, "name": "rf_checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 15, "name": "rf_advances", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "rf_last_advance", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 17, "name": "rf_resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 18, "name": "rf_resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967194, "name": "cluster_replication_node_streams", "nextColumnId": 19, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}
4294967195 {"table": {"columns": [{"id": 1, "name": "job_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "start_key", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "end_key", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967195, "name": "cluster_replication_spans", "nextColumnId": 6, "nextConstraintId": 1, "nextMutationId": 1, "primaryIndex": {"foreignKey": {}, "geoConfig": {}, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1", "viewQuery": "WITH spans AS (SELECT j.id AS job_id, jsonb_array_elements(((crdb_internal.pb_to_json('progress', i.value)->'streamIngest')->'checkpoint')->'resolvedSpans') AS s FROM system.jobs AS j LEFT JOIN system.job_info AS i ON (j.id = i.job_id) AND (i.info_key = 'legacy_progress') WHERE j.job_type = 'REPLICATION STREAM INGESTION') SELECT job_id, crdb_internal.pretty_key(decode((s->'span')->>'key', 'base64'), 0) AS start_key, crdb_internal.pretty_key(decode((s->'span')->>'endKey', 'base64'), 0) AS end_key, ((((s->'timestamp')->>'wallTime') || '.') || COALESCE(((s->'timestamp')->'logical'), '0'))::DECIMAL AS resolved, date_trunc('second', ((cluster_logical_timestamp() - ((s->'timestamp')->>'wallTime')::INT8) / 1e9)::INTERVAL) AS resolved_age FROM spans"}}
4294967196 {"table": {"columns": [{"id": 1, "name": "desc_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "version", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 3, "name": "sql_instance_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "session_id", "type": {"family": "BytesFamily", "oid": 17}}, {"id": 5, "name": "crdb_region", "type": {"family": "BytesFamily", "oid": 17}}], "formatVersion": 3, "id": 4294967196, "name": "kv_session_based_leases", "nextColumnId": 6, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}
4294967197 {"table": {"columns": [{"id": 1, "name": "id", "type": {"family": "UuidFamily", "oid": 2950}}, {"id": 2, "name": "ts", "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 3, "name": "meta_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "meta", "nullable": true, "type": {"family": "BytesFamily", "oid": 17}}, {"id": 5, "name": "num_spans", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "spans", "type": {"family": "BytesFamily", "oid": 17}}, {"id": 7, "name": "verified", "type": {"oid": 16}}, {"id": 8, "name": "target", "nullable": true, "type": {"family": "BytesFamily", "oid": 17}}, {"id": 9, "name": "decoded_meta", "nullable": true, "type": {"family": "JsonFamily", "oid": 3802}}, {"id": 10, "name": "decoded_target", "nullable": true, "type": {"family": "JsonFamily", "oid": 3802}}, {"id": 11, "name": "internal_meta", "nullable": true, "type": {"family": "JsonFamily", "oid": 3802}}, {"id": 12, "name": "num_ranges", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 13, "name": "last_updated", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}], "formatVersion": 3, "id": 4294967197, "name": "kv_protected_ts_records", "nextColumnId": 14, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}