diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index e928878e0a38..54e5846e8e14 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -346,30 +346,6 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.NetworkBytesSentGroupedByNode[originInstanceID] += bytes } - // The row execution flow attaches flow stats to a stream stat with the - // last outbox, so we need to check stream stats for max memory and disk - // usage. - // TODO(cathymw): maxMemUsage shouldn't be attached to span stats that - // are associated with streams, since it's a flow level stat. However, - // due to the row exec engine infrastructure, it is too complicated to - // attach this to a flow level span. If the row exec engine gets - // removed, getting maxMemUsage from streamStats should be removed as - // well. - if stats.stats.FlowStats.MaxMemUsage.HasValue() { - memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()) - if memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] = memUsage - } - } - if stats.stats.FlowStats.MaxDiskUsage.HasValue() { - if diskUsage := int64(stats.stats.FlowStats.MaxDiskUsage.Value()); diskUsage > a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] { - a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] = diskUsage - } - } - if stats.stats.FlowStats.ConsumedRU.HasValue() { - a.nodeLevelStats.RUEstimateGroupedByNode[originInstanceID] += int64(stats.stats.FlowStats.ConsumedRU.Value()) - } - numMessages, err := getNumNetworkMessagesFromComponentsStats(stats.stats) if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating number of network messages")) diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index d3e6f65af473..8bee6c7717ff 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -80,7 +80,7 @@ type Outbox struct { } statsCollectionEnabled bool - stats execinfrapb.ComponentStats + streamStats, flowStats execinfrapb.ComponentStats // numOutboxes is an atomic that keeps track of how many outboxes are left. // When there is one outbox left, the flow-level stats are added to the last @@ -109,7 +109,8 @@ func NewOutbox( m.streamID = streamID m.numOutboxes = numOutboxes m.isGatewayNode = isGatewayNode - m.stats.Component = flowCtx.StreamComponentID(streamID) + m.streamStats.Component = flowCtx.StreamComponentID(streamID) + m.flowStats.Component = execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID) return m } @@ -149,7 +150,7 @@ func (m *Outbox) AddRow( mustFlush = true } if m.statsCollectionEnabled { - m.stats.NetTx.TuplesSent.Add(1) + m.streamStats.NetTx.TuplesSent.Add(1) } } m.numRows++ @@ -178,8 +179,8 @@ func (m *Outbox) flush(ctx context.Context) error { } sendErr := m.stream.Send(msg) if m.statsCollectionEnabled { - m.stats.NetTx.BytesSent.Add(int64(msg.Size())) - m.stats.NetTx.MessagesSent.Add(1) + m.streamStats.NetTx.BytesSent.Add(int64(msg.Size())) + m.streamStats.NetTx.MessagesSent.Add(1) } for _, rpm := range msg.Data.Metadata { if metricsMeta, ok := rpm.Value.(*execinfrapb.RemoteProducerMetadata_Metrics_); ok { @@ -310,15 +311,12 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error return err } if !m.isGatewayNode && m.numOutboxes != nil && atomic.AddInt32(m.numOutboxes, -1) == 0 { - // TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams, - // since it's a flow level stat. However, due to the row exec engine infrastructure, it is too - // complicated to attach this to a flow level span. If the row exec engine gets removed, getting - // maxMemUsage from streamStats should be removed as well. - m.stats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes())) - m.stats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes())) - m.stats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx))) + m.flowStats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes())) + m.flowStats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes())) + m.flowStats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx))) } - span.RecordStructured(&m.stats) + span.RecordStructured(&m.streamStats) + span.RecordStructured(&m.flowStats) if !m.flowCtx.Gateway { if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil { err := m.AddRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace}) diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index e95779bfeca6..8aca9691bf6c 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -657,38 +657,38 @@ t6_expr_idx1 false i 1 0 false false mv1 false m 2 0 false true mv1_pkey false i 1 0 false false -query TBBBITT colnames,rowsort -SELECT relname, relhasrules, relhastriggers, relhassubclass, relfrozenxid, relacl, reloptions +query TBBBITTT colnames,rowsort +SELECT relname, relhasrules, relhastriggers, relhassubclass, relfrozenxid, relacl, reloptions, relreplident FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = 'public' ---- -relname relhasrules relhastriggers relhassubclass relfrozenxid relacl reloptions -t1 false false false 0 NULL NULL -t1_pkey false false false 0 NULL NULL -t1_a_key false false false 0 NULL NULL -index_key false false false 0 NULL NULL -t1_m_seq false false false 0 NULL NULL -t1_n_seq false false false 0 NULL NULL -t2 false false false 0 NULL NULL -t2_pkey false false false 0 NULL NULL -t2_t1_id_idx false false false 0 NULL NULL -t3 false false false 0 NULL NULL -t3_pkey false false false 0 NULL NULL -t3_a_b_idx false false false 0 NULL NULL -v1 false false false 0 NULL NULL -t4 false false false 0 NULL NULL -t4_pkey false false false 0 NULL NULL -t5 false false false 0 NULL NULL -t5_pkey false false false 0 NULL NULL -t6 false false false 0 NULL NULL -t6_pkey false false false 0 NULL NULL -t6_expr_idx false false false 0 NULL NULL -t6_expr_expr1_idx false false false 0 NULL NULL -t6_expr_key false false false 0 NULL NULL -t6_expr_idx1 false false false 0 NULL NULL -mv1 false false false 0 NULL NULL -mv1_pkey false false false 0 NULL NULL +relname relhasrules relhastriggers relhassubclass relfrozenxid relacl reloptions relreplident +t1 false false false 0 NULL NULL d +t1_pkey false false false 0 NULL NULL n +t1_a_key false false false 0 NULL NULL n +index_key false false false 0 NULL NULL n +t1_m_seq false false false 0 NULL NULL n +t1_n_seq false false false 0 NULL NULL n +t2 false false false 0 NULL NULL d +t2_pkey false false false 0 NULL NULL n +t2_t1_id_idx false false false 0 NULL NULL n +t3 false false false 0 NULL NULL d +t3_pkey false false false 0 NULL NULL n +t3_a_b_idx false false false 0 NULL NULL n +v1 false false false 0 NULL NULL n +t4 false false false 0 NULL NULL d +t4_pkey false false false 0 NULL NULL n +t5 false false false 0 NULL NULL d +t5_pkey false false false 0 NULL NULL n +t6 false false false 0 NULL NULL d +t6_pkey false false false 0 NULL NULL n +t6_expr_idx false false false 0 NULL NULL n +t6_expr_expr1_idx false false false 0 NULL NULL n +t6_expr_key false false false 0 NULL NULL n +t6_expr_idx1 false false false 0 NULL NULL n +mv1 false false false 0 NULL NULL d +mv1_pkey false false false 0 NULL NULL n ## pg_catalog.pg_attribute @@ -924,20 +924,20 @@ CREATE MATERIALIZED VIEW mv_test AS SELECT 1 statement ok CREATE SEQUENCE seq_test -query TT -SELECT relname, relkind +query TTT +SELECT relname, relkind, relreplident FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = 'public' ORDER BY relname ---- -mv_test m -mv_test_pkey i -seq_test S -tbl_test r -tbl_test_pkey i -tbl_test_v_idx i -view_test v +mv_test m d +mv_test_pkey i n +seq_test S n +tbl_test r d +tbl_test_pkey i n +tbl_test_v_idx i n +view_test v n statement ok DROP DATABASE relkinds diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go index 695278cb3d80..989d5913cbfd 100644 --- a/pkg/sql/pg_catalog.go +++ b/pkg/sql/pg_catalog.go @@ -664,15 +664,19 @@ https://www.postgresql.org/docs/9.5/catalog-pg-class.html`, // The only difference between tables, views and sequences are the relkind and relam columns. relKind := relKindTable relAm := forwardIndexOid + replIdent := "d" // default; if table.IsView() { relKind = relKindView if table.MaterializedView() { relKind = relKindMaterializedView + } else { + replIdent = "n" } relAm = oidZero } else if table.IsSequence() { relKind = relKindSequence relAm = oidZero + replIdent = "n" } relPersistence := relPersistencePermanent if table.IsTemporary() { @@ -724,13 +728,13 @@ https://www.postgresql.org/docs/9.5/catalog-pg-class.html`, tree.DNull, // relacl relOptions, // reloptions // These columns were automatically created by pg_catalog_test's missing column generator. - tree.DNull, // relforcerowsecurity - tree.DNull, // relispartition - tree.DNull, // relispopulated - tree.DNull, // relreplident - tree.DNull, // relrewrite - tree.DNull, // relrowsecurity - tree.DNull, // relpartbound + tree.DNull, // relforcerowsecurity + tree.DNull, // relispartition + tree.DNull, // relispopulated + tree.NewDString(replIdent), // relreplident + tree.DNull, // relrewrite + tree.DNull, // relrowsecurity + tree.DNull, // relpartbound // These columns were automatically created by pg_catalog_test's missing column generator. tree.DNull, // relminmxid ); err != nil { @@ -784,13 +788,13 @@ https://www.postgresql.org/docs/9.5/catalog-pg-class.html`, tree.DNull, // relacl tree.DNull, // reloptions // These columns were automatically created by pg_catalog_test's missing column generator. - tree.DNull, // relforcerowsecurity - tree.DNull, // relispartition - tree.DNull, // relispopulated - tree.DNull, // relreplident - tree.DNull, // relrewrite - tree.DNull, // relrowsecurity - tree.DNull, // relpartbound + tree.DNull, // relforcerowsecurity + tree.DNull, // relispartition + tree.DNull, // relispopulated + tree.NewDString("n"), // relreplident + tree.DNull, // relrewrite + tree.DNull, // relrowsecurity + tree.DNull, // relpartbound // These columns were automatically created by pg_catalog_test's missing column generator. tree.DNull, // relminmxid )